File size: 3,459 Bytes
f907a92
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import traceback
from copy import deepcopy
from typing import Dict, Any

from interpreter.code_interpreters.create_code_interpreter import create_code_interpreter
from interpreter.utils.truncate_output import truncate_output

from flows.base_flows import AtomicFlow


class InterpreterAtomicFlow(AtomicFlow):
    def __init__(self,
                 max_output=2000,
                 **kwargs):
        super().__init__(**kwargs)
        self.max_output = max_output
        self._code_interpreters = {}

    @classmethod
    def instantiate_from_config(cls, config):
        flow_config = deepcopy(config)

        kwargs = {"flow_config": flow_config}

        # ~~~ Instantiate flow ~~~
        return cls(**kwargs)


    def set_up_flow_state(self):
        """
        class-specific flow state: language and code,
        which describes the programming language and the code to run.
        """
        super().set_up_flow_state()
        self.flow_state["language"] = None
        self.flow_state["code"] = ""

    def _state_update_add_language_and_code(self,
                                            language: str,
                                            code: str) -> None:
        """
        updates the language and code passed from _process_input_data
        to the flow state
        """
        self.flow_state["language"] = language
        self.flow_state["code"] = code

    def _process_input_data(self, input_data: Dict[str, Any]):
        """
        sanity check for input, allocate interpreter if any,
        pass input data into flow state

        should call something like self._update_flow_state()
        """
        # ~~~ Sanity check of input_data ~~~
        assert "language" in input_data, "attribute 'language' not in input data."
        assert "code" in input_data, "attribute 'code' not in input data."

        # code in Jupyter notebook that starts with '!' is actually shell command.
        if input_data["language"] == "python" and input_data["code"].startswith("!"):
            input_data["language"] = "shell"
            input_data["code"] = input_data["code"][1:]

        # ~~~ Allocate interpreter ~~~
        # interpreter existence is checked in create_code_interpreter()
        # TODO: consider: should we put language not supported error into output?
        language = input_data["language"]
        if language not in self._code_interpreters:
            self._code_interpreters[language] = create_code_interpreter(language)

        # ~~~ Pass input data to flow state ~~~
        self._state_update_add_language_and_code(
            language=language,
            code=input_data["code"]
        )

    def _call(self):
        output = ""
        try:
            code_interpreter = self._code_interpreters[self.flow_state["language"]]
            code = self.flow_state["code"]
            for line in code_interpreter.run(code):
                if "output" in line:
                    output += "\n" + line["output"]
                    # Truncate output
                    output = truncate_output(output, self.max_output)
                    output = output.strip()
        except:
            output = traceback.format_exc()
            output = output.strip()
        return output

    def run(
            self,
            input_data: Dict[str, Any]):
        self._process_input_data(input_data)
        response = self._call()
        return {"interpreter_output": response}