File size: 4,372 Bytes
f907a92 803677d f907a92 26273de f907a92 803677d f907a92 c23d3f9 803677d c23d3f9 f907a92 b658ad5 f907a92 b658ad5 f907a92 b658ad5 f907a92 b658ad5 f907a92 b658ad5 f907a92 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 |
import traceback
from copy import deepcopy
from typing import Dict, Any
from .code_interpreters.create_code_interpreter import create_code_interpreter
from aiflows.base_flows import AtomicFlow
def truncate_output(data, max_output_chars=2000):
needs_truncation = False
message = f'Output truncated. Showing the last {max_output_chars} characters.\n\n'
# Remove previous truncation message if it exists
if data.startswith(message):
data = data[len(message):]
needs_truncation = True
# If data exceeds max length, truncate it and add message
if len(data) > max_output_chars or needs_truncation:
data = message + data[-max_output_chars:]
return data
class InterpreterAtomicFlow(AtomicFlow):
"""This flow is used to run the code passed from the caller.
*Expected Input*:
- `code`
- `language_of_code`
*Expected Output*:
- `interpreter_output`: output of the code interpreter
Full credits to open-interpreter (https://github.com/KillianLucas/open-interpreter)
for the usage of code interpreters (package `code_interpreters`) and the function truncate_output()
"""
def __init__(self,
max_output=2000,
**kwargs):
super().__init__(**kwargs)
self.max_output = max_output
self._code_interpreters = {}
@classmethod
def instantiate_from_config(cls, config):
flow_config = deepcopy(config)
kwargs = {"flow_config": flow_config}
# ~~~ Instantiate flow ~~~
return cls(**kwargs)
def set_up_flow_state(self):
"""
class-specific flow state: language and code,
which describes the programming language and the code to run.
"""
super().set_up_flow_state()
self.flow_state["language"] = None
self.flow_state["code"] = ""
def _state_update_add_language_and_code(self,
language: str,
code: str) -> None:
"""
updates the language and code passed from _process_input_data
to the flow state
"""
self.flow_state["language"] = language
self.flow_state["code"] = code
def _check_input(self, input_data: Dict[str, Any]):
"""
Sanity check of input data
"""
# ~~~ Sanity check of input_data ~~~
assert "language" in input_data, "attribute 'language' not in input data."
assert "code" in input_data, "attribute 'code' not in input data."
def _process_input_data(self, input_data: Dict[str, Any]):
"""
Allocate interpreter if any, pass input data into flow state
"""
# code in Jupyter notebook that starts with '!' is actually shell command.
if input_data["language"] == "python" and input_data["code"].startswith("!"):
input_data["language"] = "shell"
input_data["code"] = input_data["code"][1:]
# ~~~ Allocate interpreter ~~~
# interpreter existence is checked in create_code_interpreter()
# TODO: consider: should we put language not supported error into output?
language = input_data["language"]
if language not in self._code_interpreters:
self._code_interpreters[language] = create_code_interpreter(language)
# ~~~ Pass input data to flow state ~~~
self._state_update_add_language_and_code(
language=language,
code=input_data["code"]
)
def _call(self):
output = ""
try:
code_interpreter = self._code_interpreters[self.flow_state["language"]]
code = self.flow_state["code"]
for line in code_interpreter.run(code):
if "output" in line:
output += "\n" + line["output"]
# Truncate output
output = truncate_output(output, self.max_output)
output = output.strip()
except:
output = traceback.format_exc()
output = output.strip()
return output
def run(
self,
input_data: Dict[str, Any]):
self._check_input(input_data)
self._process_input_data(input_data)
response = self._call()
return {"interpreter_output": response}
|