File size: 2,708 Bytes
e9cd8d3
 
 
 
 
 
 
 
 
 
 
 
5af478e
 
 
 
 
e9cd8d3
 
5af478e
 
 
 
 
 
e9cd8d3
 
 
 
5af478e
 
 
 
 
 
e9cd8d3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5af478e
 
 
 
 
 
e9cd8d3
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
from typing import Dict, Any
from aiflows.base_flows.atomic import AtomicFlow
class UpdatePlanAtomicFlow(AtomicFlow):
    """This flow updates the plan file with the updated plan. The controller should pass the updated plan to this flow.
    This design (controller reflect on the existing plan--update plan) is intended to let the controller more aware of the
    plan it has. However one setback is that this process in then not deterministic.
    
    *Input Interface*
    - `updated_plan`: the updated plan in string format
    
    *Output Interface*
    - `result`: the result of the update plan operation

    *Configuration Parameters*:
    - `input_interface`: the input interface of the flow, default: `["updated_plan"]`
    - `output_interface`: the output interface of the flow, default: `["result"]`

    """
    def _check_input(self, input_data: Dict[str, Any]):
        """Check if the input data is valid.
        :param input_data: the input data to the flow
        :type input_data: Dict[str, Any]
        :raises AssertionError: if the input data is not valid
        :raises AssertionError: if the input data does not contain the required keys
        """
        assert "memory_files" in input_data, "memory_files not passed to UpdatePlanAtomicFlow.yaml"
        assert "plan" in input_data["memory_files"], "plan not in memory_files"

    def _call(self, input_data: Dict[str, Any]):
        """The main function of the flow.
        :param input_data: the input data to the flow
        :type input_data: Dict[str, Any]
        :return: the output data of the flow
        :rtype: Dict[str, Any]
        """
        try:
            plan_file_location = input_data["memory_files"]["plan"]
            plan_to_write = input_data["updated_plan"]
            with open(plan_file_location, 'w') as file:
                file.write(plan_to_write + "\n")
            return {
                "result": "updated plan saved to the plan file and has overriden the previous plan",
                "summary": f"Coder/UpdatePlanFlow: updated plan saved to {plan_file_location}"
            }
        except Exception as e:
            return {
                "result": f"Error occurred: {str(e)}",
                "summary": f"Coder/UpdatePlanFlow: error occurred while writing updated plan: {str(e)}"
            }

    def run(
            self,
            input_data: Dict[str, Any]
    ):
        """The run function of the flow.
        :param input_data: the input data to the flow
        :type input_data: Dict[str, Any]
        :return: the output data of the flow
        :rtype: Dict[str, Any]
        """
        self._check_input(input_data)
        return self._call(input_data)