CoderFlowModule / UpdatePlanAtomicFlow.py
Tachi67's picture
add more comments & docs
5af478e verified
from typing import Dict, Any
from aiflows.base_flows.atomic import AtomicFlow
class UpdatePlanAtomicFlow(AtomicFlow):
"""This flow updates the plan file with the updated plan. The controller should pass the updated plan to this flow.
This design (controller reflect on the existing plan--update plan) is intended to let the controller more aware of the
plan it has. However one setback is that this process in then not deterministic.
*Input Interface*
- `updated_plan`: the updated plan in string format
*Output Interface*
- `result`: the result of the update plan operation
*Configuration Parameters*:
- `input_interface`: the input interface of the flow, default: `["updated_plan"]`
- `output_interface`: the output interface of the flow, default: `["result"]`
"""
def _check_input(self, input_data: Dict[str, Any]):
"""Check if the input data is valid.
:param input_data: the input data to the flow
:type input_data: Dict[str, Any]
:raises AssertionError: if the input data is not valid
:raises AssertionError: if the input data does not contain the required keys
"""
assert "memory_files" in input_data, "memory_files not passed to UpdatePlanAtomicFlow.yaml"
assert "plan" in input_data["memory_files"], "plan not in memory_files"
def _call(self, input_data: Dict[str, Any]):
"""The main function of the flow.
:param input_data: the input data to the flow
:type input_data: Dict[str, Any]
:return: the output data of the flow
:rtype: Dict[str, Any]
"""
try:
plan_file_location = input_data["memory_files"]["plan"]
plan_to_write = input_data["updated_plan"]
with open(plan_file_location, 'w') as file:
file.write(plan_to_write + "\n")
return {
"result": "updated plan saved to the plan file and has overriden the previous plan",
"summary": f"Coder/UpdatePlanFlow: updated plan saved to {plan_file_location}"
}
except Exception as e:
return {
"result": f"Error occurred: {str(e)}",
"summary": f"Coder/UpdatePlanFlow: error occurred while writing updated plan: {str(e)}"
}
def run(
self,
input_data: Dict[str, Any]
):
"""The run function of the flow.
:param input_data: the input data to the flow
:type input_data: Dict[str, Any]
:return: the output data of the flow
:rtype: Dict[str, Any]
"""
self._check_input(input_data)
return self._call(input_data)