File size: 3,315 Bytes
d832d4e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
075da16
 
 
 
 
 
 
 
 
d832d4e
 
 
 
 
 
 
 
075da16
 
 
 
 
 
 
d832d4e
 
 
 
 
075da16
 
 
 
 
 
 
d832d4e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
075da16
 
 
 
 
 
 
d832d4e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
from copy import deepcopy
from typing import Any, Dict

from aiflows.base_flows import SequentialFlow
from aiflows.utils import logging

logging.set_verbosity_debug()
log = logging.get_logger(__name__)

class InteractiveCodeGenFlow(SequentialFlow):
    """This flow writes code in an interactive manner. It is a sequential flow composed of:
    1. MemoryReading: reads in the code library.
    2. CodeGenerator: generates code based on the goal and functions in the code library.
    3. CodeFileEditor: writes the generated code to a temp file for the user to see, edit and provide feedback.
    4. ParseFeedback: opens up the temp file with vscode and parses the feedback from the user.
    
    *Input Interface*:
    - `goal`
    
    *Output Interface*:
    - `code`
    - `feedback`
    - `temp_code_file_location`


    *Configuration Parameters*:
    - `input_interface`: the input interface of the flow.
    - `output_interface`: the output interface of the flow.
    - `subflows_config`: the configuration of the subflows.
    - `early_exit_key`: the key in the state dict that indicates whether the flow should exit early.
    - `topology`: the topology of the subflows.

    """
    REQUIRED_KEYS_CONFIG = ["max_rounds", "early_exit_key", "topology", "memory_files"]

    def __init__(
            self,
            memory_files: Dict[str, Any],
            **kwargs
    ):
        """
        This function initializes the flow.
        :param memory_files: the memory files that are used in the flow.
        :type memory_files: Dict[str, Any]
        :param kwargs: the keyword arguments.
        :type kwargs: Any
        """
        super().__init__(**kwargs)
        self.memory_files = memory_files

    @classmethod
    def instantiate_from_config(cls, config):
        """
        This function instantiates the flow from a configuration.
        :param config: the configuration of the flow.
        :type config: Dict[str, Any]
        :return: the instantiated flow.
        :rtype: Flow
        """
        flow_config = deepcopy(config)

        kwargs = {"flow_config": flow_config}

        # ~~~ Set up memory file ~~~
        memory_files = flow_config["memory_files"]
        kwargs.update({"memory_files": memory_files})

        # ~~~ Set up subflows ~~~
        kwargs.update({"subflows": cls._set_up_subflows(flow_config)})

        # ~~~ Instantiate flow ~~~
        return cls(**kwargs)

    def run(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
        """
        This function runs the flow.
        :param input_data: the input data to the flow.
        :type input_data: Dict[str, Any]
        :return: the output data from the flow.
        :rtype: Dict[str, Any]
        """
        # ~~~ sets the input_data in the flow_state dict ~~~
        self._state_update_dict(update_data=input_data)

        # ~~~ set the memory file to the flow state ~~~
        self._state_update_dict(update_data={"memory_files": self.memory_files})

        max_rounds = self.flow_config.get("max_rounds", 1)
        if max_rounds is None:
            log.info(f"Running {self.flow_config['name']} without `max_rounds` until the early exit condition is met.")

        self._sequential_run(max_rounds=max_rounds)

        output = self._get_output_from_state()

        return output