| from typing import Dict, Any | |
| from aiflows.base_flows.atomic import AtomicFlow | |
| class UpdatePlanAtomicFlow(AtomicFlow): | |
| """This class is used to update the plan file with the updated plan, called by the controller, | |
| when it realizes one step of the plan is done, and provide the updated plan, it is exactly the same | |
| as the old plan, except the step that is done is marked as done. | |
| *Input Interface*: | |
| - `updated_plan`: the updated plan, exactly the same as the old plan, except the step that is done is marked as done. | |
| *Output Interface*: | |
| - `result`: the result of the operation | |
| *Configuration Parameters*: | |
| - `input_interface`: the input interface of the atomic flow | |
| - `output_interface`: the output interface of the atomic flow | |
| """ | |
| def _check_input(self, input_data: Dict[str, Any]): | |
| """ | |
| Check if the input data is valid. | |
| :param input_data: the input data to be checked | |
| :type input_data: Dict[str, Any] | |
| :raises AssertionError: if the input data is invalid | |
| :raises AssertionError: if the input data does not contain the plan file location | |
| """ | |
| assert "memory_files" in input_data, "memory_files not passed to UpdatePlanAtomicFlow.yaml" | |
| assert "plan" in input_data["memory_files"], "plan not in memory_files" | |
| def _call(self, input_data: Dict[str, Any]): | |
| """ | |
| Write the updated plan to the plan file. | |
| :param input_data: the input data | |
| :type input_data: Dict[str, Any] | |
| :return: the result of the operation | |
| :rtype: Dict[str, Any] | |
| """ | |
| try: | |
| plan_file_location = input_data["memory_files"]["plan"] | |
| plan_to_write = input_data["updated_plan"] | |
| with open(plan_file_location, 'w') as file: | |
| file.write(plan_to_write + "\n") | |
| return { | |
| "result": "updated plan saved to the plan file and has overriden the previous plan", | |
| "summary": f"Jarvis/UpdatePlanFlow: updated plan saved to {plan_file_location}" | |
| } | |
| except Exception as e: | |
| return { | |
| "result": f"Error occurred: {str(e)}", | |
| "summary": f"Jarvis/UpdatePlanFlow: error occurred while writing updated plan: {str(e)}" | |
| } | |
| def run( | |
| self, | |
| input_data: Dict[str, Any] | |
| ): | |
| """ | |
| Run the atomic flow. | |
| :param input_data: the input data | |
| :type input_data: Dict[str, Any] | |
| :return: the result of the operation | |
| :rtype: Dict[str, Any] | |
| """ | |
| self._check_input(input_data) | |
| return self._call(input_data) |