import builtins import contextlib import io from typing import Any from langchain.chat_models import init_chat_model from langgraph_codeact import create_codeact from langchain_core.messages import HumanMessage, SystemMessage from langchain_core.tools import tool from langchain_core.messages import HumanMessage, SystemMessage from langchain_core.tools import tool from langchain_community.document_loaders import WikipediaLoader, ArxivLoader from openai import OpenAI # audio import os import requests import subprocess from typing import Optional, Any import tempfile from urllib.parse import urlparse import uuid from langchain_tavily import TavilySearch from pathlib import Path # new import def eval(code: str, _locals: dict[str, Any]) -> tuple[str, dict[str, Any]]: # Store original keys before execution original_keys = set(_locals.keys()) try: with contextlib.redirect_stdout(io.StringIO()) as f: exec(code, builtins.__dict__, _locals) result = f.getvalue() or "" except Exception as e: result = f"Error during execution: {repr(e)}" # Determine new variables created during execution new_keys = set(_locals.keys()) - original_keys new_vars = {key: _locals[key] for key in new_keys} return result, new_vars # Tools @tool def execute_python_code(code_path: str) -> str: """ Execute a Python script and return the final output or error. Args: code_path (str): the path to the Python file to be executed """ try: if not os.path.exists(code_path): return f"Error: file not found at {code_path}" # Execute the Python file and capture output result = subprocess.run( ['python', code_path], capture_output=True, text=True, check=True ) return result.stdout except subprocess.CalledProcessError as e: # Capture any error that occurs during execution return f"Execution error: {e.stderr}" except Exception as e: return f"Unexpected error: {str(e)}" #@tool #def speech_to_text(file_path: str) -> str: # """ # Transcribe an audio file from a local path to text. # Args: # file_path (str): Local path of the audio file to be transcribed. # """ # client = OpenAI() # try: # Check if the file exists # if not os.path.exists(file_path): # return f"Error: file not found at {file_path}" # Step 2: Transcribe the audio # with open(file_path, "rb") as file: # transcription = client.audio.transcriptions.create( # model="gpt-4o-mini-transcribe", # file=file # ) # print(f"Transcription result: {transcription['text']}") # return transcription["text"] # except Exception as e: # return f"Error during transcription: {str(e)}" @tool def speech_to_text(file_path: str) -> str: """ Transcribe an audio file from a local path to text. Args: file_path (str): Local path of the audio file to be transcribed. """ client = OpenAI() try: # Check if the file exists if not os.path.exists(file_path): return f"Error: file not found at {file_path}" # Transcribe the audio with open(file_path, "rb") as file: transcription = client.audio.transcriptions.create( model="gpt-4o-mini-transcribe", file=file ) print(f"Transcription result: {transcription['text']}") return transcription["text"] except Exception as e: return f"Error during transcription: {str(e)}" @tool def web_search(query: str) -> str: """ Search Tavily for a query and return formatted results. Args: query (str): The search query. Returns: str: A formatted string with the search results. """ try: search_tool = TavilySearch(max_results=3, topic="general") search_response = search_tool.invoke(input=query) # Check if the response contains results if search_response and "results" in search_response: results = search_response["results"] return "\n\n---\n\n".join( [ f"Title: {result['title']}\nURL: {result['url']}\nContent: {result['content']}" for result in results ] ) else: return "No results found." except Exception as e: print(f"Error during web search: {str(e)}") return f"Error during web search: {str(e)}" @tool def arvix_search(query: str) -> str: """ Search Arxiv for a query and return maximum 3 results. Args: query: The search query. """ try: search_docs = ArxivLoader(query=query, load_max_docs=3).load() return "\n\n---\n\n".join( [ f'\n{doc.page_content[:1000]}\n' for doc in search_docs ] ) except Exception as e: return f"Error during Arxiv search: {str(e)}" @tool def save_and_read_file(content: str, filename: Optional[str] = None) -> str: """ Save content to a file and return the path. Args: content (str): the content to save to the file filename (str, optional): the name of the file. If not provided, a random name file will be created. """ temp_dir = tempfile.gettempdir() if filename is None: temp_file = tempfile.NamedTemporaryFile(delete=False, dir=temp_dir) filepath = temp_file.name else: filepath = os.path.join(temp_dir, filename) with open(filepath, "w") as f: f.write(content) return f"File saved to {filepath}. You can read this file to process its contents." @tool def read_file(file_path: str) -> str: """ Return the raw text of a local file. Args: file_path (str): Local path of the file to be read. """ try: with open(file_path, "r", encoding="utf‑8", errors="ignore") as f: return f.read() except Exception as e: return f"Error reading {file_path}: {e}" @tool def download_file_from_url(url: str, filename: Optional[str] = None) -> str: """ Download a file from a URL and save it to a temporary location. Args: url (str): the URL of the file to download. filename (str, optional): the name of the file. If not provided, a random name file will be created. """ try: # Parse URL to get filename if not provided if not filename: path = urlparse(url).path filename = os.path.basename(path) if not filename: filename = f"downloaded_{uuid.uuid4().hex[:8]}" # Create temporary file temp_dir = tempfile.gettempdir() filepath = os.path.join(temp_dir, filename) # Download the file response = requests.get(url, stream=True) response.raise_for_status() # Save the file with open(filepath, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return f"File downloaded to {filepath}. You can read this file to process its contents." except Exception as e: return f"Error downloading file: {str(e)}" tools = [ #execute_python_code, #speech_to_text, web_search, #arvix_search, #wiki_search #read_file, #save_and_read_file, #download_file_from_url ] class CodeActAgent: def __init__( self, system_prompt_path: str = "prompts/system_prompt.txt", model: str = "gpt-4o" ) -> None: self.system_prompt = Path(system_prompt_path).read_text(encoding="utf-8")#self.read_system_prompt(system_prompt_path) self.llm = init_chat_model(model, model_provider="openai") self.compiled_agent = create_codeact( self.llm, tools, eval ).compile() def read_system_prompt(self, path:str) -> str: with open(path, "r") as file: return file.read() def extract_final_answer(self, response: str) -> str: if "FINAL ANSWER:" in response: answer = response.split("FINAL ANSWER:")[1].strip().replace(".", "") else: # fallback if model did not follow instruction perfectly answer = response.strip() # Remove trailing period, but only at the end if answer.endswith("."): answer = answer[:-1].strip() return answer def __call__( self, question: str, #file_path: str=None ) -> str: inputs = { "messages": [ SystemMessage(content=self.system_prompt), HumanMessage(content=question) ] } # Add file path if available #inputs["file_path"] = file_path or None # type: ignore # Run the graph with the inputs result = self.compiled_agent.invoke( inputs, config={ "configurable": {"thread_id": "benchmark-test"} } ) final_msg = result["messages"][-1].content return self.extract_final_answer(final_msg)