from typing_extensions import List, Any, Union from e2b_code_interpreter import Sandbox from langchain_core.messages import HumanMessage, AIMessage, SystemMessage import os import sys import base64 import gradio as gr import time import structlog;log=structlog.get_logger() sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # not the nicest way of handling this, but oh well... from generation.analyzer import Analyzer, CodeAct from retrieval.retriever import Metadata from utils import generate_system_prompt, get_file_from_title, get_path_from_title class SimpleAnalyzer(Analyzer): """ Uploads datasets to E2B sandbox, then runs AI-generated code in the sandbox to analyze the data. Is able to fix errors in the code and retry the analysis. """ def __init__(self, groupOwner, metadata_docs: List[Metadata], coding_client=None, timeout=60, max_steps=5): super().__init__(groupOwner, metadata_docs, coding_client) self.coding_llm = self.coding_client.with_structured_output(CodeAct, include_raw=True) self.sbx = Sandbox(template="ck9xjl2f121b2synr4f5", api_key=os.environ.get("E2B_API_KEY"), timeout=timeout) self.max_steps = max_steps setup_code, logs, file_names = self.init_sandbox() self.init_conversation(setup_code, logs, file_names) def finalize(self): log.info("Killing the sandbox...") self.sbx.kill() def run_ai_generated_code(self, ai_generated_code: str) -> tuple[bool, List[Any]]: """ Run the AI-generated code in the sandbox and return the results. :param ai_generated_code: The AI-generated code to run. :return: A tuple containing a boolean indicating success and a list of results. """ log.info("Will run following code in the sandbox:") log.info("========================== Code ==========================") log.info(ai_generated_code) log.info("==========================================================\n") execution = self.sbx.run_code(ai_generated_code) log.info('Code execution finished!') response = [] print_log = "\n".join(execution.logs.stdout) response.append(print_log) log.info(print_log) # First let's check if the code ran successfully. if execution.error: log.info('AI-generated code had an error.') log.info(execution.error.traceback) response.append(execution.error.traceback) return False, response # stdout, stderr # Iterate over all the results and specifically check for png files that will represent the chart. result_idx = 0 log.info(f"Number of results: {len(execution.results)}") for result in execution.results: if result.text: response.append(result.text) if result.png: # Save the png to a file # The png is in base64 format. with open(f'chart-{result_idx}.png', 'wb') as f: f.write(base64.b64decode(result.png)) response.append(gr.Image(f'chart-{result_idx}.png', width=800, show_label=False)) log.info(f'Chart saved to chart-{result_idx}.png') result_idx += 1 return True, response def init_sandbox(self) -> tuple[str, List[Any], List[str]]: """ Initialize the E2B sandbox by uploading relevant datasets and running setup code. :return: A tuple containing the setup code, the logs from the setup code, and the names of the uploaded datasets """ try: file_names = [m.title for m in self.metadata_docs] # Upload datasets to sandbox for file_name in file_names: file_path = get_path_from_title(self.groupOwner, file_name) with open(file_path, "rb") as f: self.sbx.files.write(get_file_from_title(self.groupOwner, file_name), f) log.info(f"Dataset {get_file_from_title(self.groupOwner, file_name)} uploaded to sandbox!") setup_code = f""" import geopandas as gpd import matplotlib.pyplot as plt import contextily as cx from geopy.geocoders import Nominatim geolocator = Nominatim(user_agent="ogd4all") """ cleaned_file_names = [get_file_from_title(self.groupOwner, file_name) for file_name in file_names] # Initial code for LLM to understand data: # - For every dataset, print layer names for idx, file_name in enumerate(cleaned_file_names): name = file_name.split(".")[0] # remove suffix setup_code += f""" # Get layers included in dataset {name} path_{idx} = "{file_name}" print(f"Layers in dataset {name}: {{gpd.list_layers(path_{idx})['name'].to_list()}}") """ # Fix indentation of the code setup_code = "\n".join([line.strip() for line in setup_code.split("\n")]) # Run code on sandbox success, logs = self.run_ai_generated_code(setup_code) if not success: log.error('Setup code had an error.') log.error(logs) self.finalize() sys.exit(1) else: log.info("Setup code ran successfully!") log.info(logs) return setup_code, logs, cleaned_file_names except Exception as e: log.error("Caught an exception in sandbox init: %s", e, exc_info=True, backtrace=True, diagnose=True) self.finalize() sys.exit(1) def init_conversation(self, setup_code, logs, file_names): """ Initialize the conversation with the analyzer :param logs: The logs from the setup code. :param file_names: The names of the uploaded datasets. """ system_prompt = generate_system_prompt(file_names, self.metadata_docs, setup_code, logs) log.info("System prompt:") log.info(system_prompt) self.messages.append(SystemMessage(system_prompt)) def analyze(self, query: Union[str, dict]): # Extract text from multimodal input for backward compatibility if isinstance(query, dict) and "text" in query: text_query = query["text"] elif isinstance(query, str): text_query = query else: text_query = str(query) self.messages.append(HumanMessage(f"Request/Question: {text_query}\n")), start_time = time.time() log.info(f"User question: {text_query}") thought_msg = gr.ChatMessage( role="assistant", content="", metadata={"title": "Analyzing the data...", "status": "pending"}, ) yield thought_msg steps = 0 while steps < self.max_steps: steps += 1 response = self.coding_llm.invoke(self.messages) codeAct = response["parsed"] self.total_tokens += response.usage_metadata["total_tokens"] # If the LLM wrapped the code with ```python ``` or ``` ```, remove it codeAct.code = codeAct.code.replace("```python", "").replace("```", "").strip() thought_msg.content += f"**Thought**: {codeAct.thought}\n" thought_msg.content += f"**Code**: \n```python\n{codeAct.code}\n``` \n" yield thought_msg success, response = self.run_ai_generated_code(codeAct.code) if not success: thought_msg.content += f"**Output**:\n```\n{response[0].replace("```", "")}\n```\n**Error**:\n```\n{response[1].replace("```", "")}\n```\n" new_msgs = [AIMessage("Thought: " + codeAct.thought), AIMessage("**Code**: " + f"```python\n{codeAct.code}\n```"), AIMessage(f"**Output**: \n{response[0]}\nError: \n{response[1]}"), HumanMessage("Please fix the error and try again.")] self.messages = self.messages + new_msgs yield thought_msg else: thought_msg.metadata["status"] = "done" thought_msg.metadata["title"] = "Analysis completed" thought_msg.metadata["duration"] = time.time() - start_time # Concat all text responses output_texts = [r for r in response if isinstance(r, str)] output_text = "\n".join(output_texts) new_msgs = [AIMessage("Thought: " + codeAct.thought), AIMessage("**Code**: " + f"```python\n{codeAct.code}\n```"), AIMessage(f"**Output**: \n{output_text}")] self.messages = self.messages + new_msgs yield [thought_msg] + response return thought_msg.metadata["status"] = "done" thought_msg.metadata["title"] = f"Maximum thinking steps ({self.max_steps}) exceeded" thought_msg.metadata["duration"] = time.time() - start_time yield [thought_msg, "Unfortunately, I was not able to find a solution to your request."] return