Spaces:
Runtime error
Runtime error
File size: 7,759 Bytes
0282efb e1776b1 0282efb e1776b1 0282efb e1776b1 0282efb e1776b1 0282efb e1776b1 0282efb e1776b1 0282efb e1776b1 0282efb 2b0a7db 0282efb 2b0a7db 0282efb 65d2933 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
import uuid, base64, re
from io import BytesIO
from codeboxapi import CodeBox # type: ignore
from codeboxapi.schema import CodeBoxOutput # type: ignore
from langchain.tools import StructuredTool
from langchain.chat_models import ChatOpenAI
from langchain.chat_models.base import BaseChatModel
from langchain.prompts.chat import MessagesPlaceholder
from langchain.agents import AgentExecutor, BaseSingleActionAgent
from langchain.memory import ConversationBufferMemory
from codeinterpreterapi.schemas import CodeInterpreterResponse, CodeInput, File, UserRequest # type: ignore
from codeinterpreterapi.config import settings
from codeinterpreterapi.functions_agent import OpenAIFunctionsAgent
from codeinterpreterapi.prompts import code_interpreter_system_message
from codeinterpreterapi.callbacks import CodeCallbackHandler
from codeinterpreterapi.chains.modifications_check import get_file_modifications
from codeinterpreterapi.chains.remove_download_link import remove_download_link
class CodeInterpreterSession():
def __init__(self, model=None, openai_api_key=None) -> None:
self.codebox = CodeBox()
self.tools: list[StructuredTool] = self._tools()
self.llm: BaseChatModel = self._llm(model, openai_api_key)
self.agent_executor: AgentExecutor = self._agent_executor()
self.input_files: list[File] = []
self.output_files: list[File] = []
async def _init(self) -> None:
await self.codebox.astart()
async def _close(self) -> None:
await self.codebox.astop()
def _tools(self) -> list[StructuredTool]:
return [
StructuredTool(
name = "python",
description =
# TODO: variables as context to the agent
# TODO: current files as context to the agent
"Input a string of code to a python interpreter (jupyter kernel). "
"Variables are preserved between runs. ",
func = self.codebox.run,
coroutine = self.arun_handler,
args_schema = CodeInput,
),
]
def _llm(self, model: str | None, openai_api_key: str | None) -> BaseChatModel:
if model is None:
model = "gpt-4"
if openai_api_key is None:
if settings.OPENAI_API_KEY is None:
raise ValueError("OpenAI API key missing.")
else:
openai_api_key = settings.OPENAI_API_KEY
return ChatOpenAI(
temperature=0.03,
model=model,
openai_api_key=openai_api_key,
max_retries=3,
request_timeout=60*3,
) # type: ignore
def _agent(self) -> BaseSingleActionAgent:
return OpenAIFunctionsAgent.from_llm_and_tools(
llm=self.llm,
tools=self.tools,
system_message=code_interpreter_system_message,
extra_prompt_messages=[MessagesPlaceholder(variable_name="memory")],
)
def _agent_executor(self) -> AgentExecutor:
return AgentExecutor.from_agent_and_tools(
agent=self._agent(),
callbacks=[CodeCallbackHandler(self)],
max_iterations=9,
tools=self.tools,
verbose=settings.VERBOSE,
memory=ConversationBufferMemory(memory_key="memory", return_messages=True),
)
async def show_code(self, code: str) -> None:
""" Callback function to show code to the user. """
if settings.VERBOSE:
print(code)
def run_handler(self, code: str):
raise NotImplementedError("Use arun_handler for now.")
async def arun_handler(self, code: str):
""" Run code in container and send the output to the user """
# TODO: upload files
output: CodeBoxOutput = await self.codebox.arun(code)
if not isinstance(output.content, str):
raise TypeError("Expected output.content to be a string.")
if output.type == "image/png":
filename = f"image-{uuid.uuid4()}.png"
file_buffer = BytesIO(base64.b64decode(output.content))
file_buffer.name = filename
# self.output_files.append(discord.File(path_like_file, filename)) TODO: add to output_files
return f"Image {filename} got send to the user."
elif output.type == "error":
# TODO: check if package install is required
# TODO: preanalyze error to optimize next code generation
print("Error:", output.content)
elif (modifications := await get_file_modifications(code, self.llm)):
for filename in modifications:
if filename in [file.name for file in self.input_files]:
continue
fileb = await self.codebox.adownload(filename)
if not fileb.content:
continue
file_buffer = BytesIO(fileb.content)
file_buffer.name = filename
self.output_files.append(File(name=filename, content=file_buffer.read()))
return output.content
async def input_handler(self, request: UserRequest):
if not request.files:
return
if not request.content:
request.content = "I uploaded, just text me back and confirm that you got the file(s)."
request.content += "\n**The user uploaded the following files: **\n"
for file in request.files:
self.input_files.append(file)
request.content += f"[Attachment: {file.name}]\n"
await self.codebox.aupload(file.name, file.content)
request.content += "**File(s) are now available in the cwd. **\n"
async def output_handler(self, final_response: str) -> CodeInterpreterResponse:
""" Embed images in the response """
for file in self.output_files:
if str(file.name) in final_response:
# rm  from the response
final_response = re.sub(rf"\n\n!\[.*\]\(.*\)", "", final_response)
if self.output_files and re.search(rf"\n\[.*\]\(.*\)", final_response):
final_response = await remove_download_link(final_response, self.llm)
return CodeInterpreterResponse(content=final_response, files=self.output_files)
async def generate_response(
self,
user_msg: str,
files: list[File] = [],
detailed_error: bool = False,
) -> CodeInterpreterResponse:
""" Generate a Code Interpreter response based on the user's input."""
user_request = UserRequest(content=user_msg, files=files)
try:
await self.input_handler(user_request)
response = await self.agent_executor.arun(input=user_request.content)
return await self.output_handler(response)
except Exception as e:
if settings.VERBOSE:
import traceback
traceback.print_exc()
if detailed_error:
return CodeInterpreterResponse(content=
f"Error in CodeInterpreterSession: {e.__class__.__name__} - {e}"
)
else:
return CodeInterpreterResponse(content=
"Sorry, something went while generating your response."
"Please try again or restart the session."
)
async def __aenter__(self) -> "CodeInterpreterSession":
await self._init()
return self
async def __aexit__(self, exc_type, exc_value, traceback) -> None:
await self._close()
|