from langgraph.graph import StateGraph from langchain_core.messages import HumanMessage, SystemMessage from langchain_core.runnables import RunnableLambda from langchain_core.tools import tool from langchain_community.document_loaders import WikipediaLoader, ArxivLoader from langchain_openai import ChatOpenAI from openai import OpenAI # audio import os import requests import subprocess from typing import TypedDict, Annotated, Optional, List, Dict, Any import tempfile from urllib.parse import urlparse import uuid from langgraph.graph.message import AnyMessage, add_messages from langchain_tavily import TavilySearch from langgraph.prebuilt import tools_condition from langgraph.prebuilt import ToolNode from langgraph.graph import START # Tools @tool def execute_python_code(code_path: str) -> str: """ Execute a Python script and return the final output or error. Args: code_path (str): the path to the Python file to be executed """ try: if not os.path.exists(code_path): return f"Error: file not found at {code_path}" # Execute the Python file and capture output result = subprocess.run( ['python', code_path], capture_output=True, text=True, check=True ) return result.stdout except subprocess.CalledProcessError as e: # Capture any error that occurs during execution return f"Execution error: {e.stderr}" except Exception as e: return f"Unexpected error: {str(e)}" #@tool #def speech_to_text(file_path: str) -> str: # """ # Transcribe an audio file from a local path to text. # Args: # file_path (str): Local path of the audio file to be transcribed. # """ # client = OpenAI() # try: # Check if the file exists # if not os.path.exists(file_path): # return f"Error: file not found at {file_path}" # Step 2: Transcribe the audio # with open(file_path, "rb") as file: # transcription = client.audio.transcriptions.create( # model="gpt-4o-mini-transcribe", # file=file # ) # print(f"Transcription result: {transcription['text']}") # return transcription["text"] # except Exception as e: # return f"Error during transcription: {str(e)}" @tool def speech_to_text(file_path: str) -> str: """ Transcribe an audio file from a local path to text. Args: file_path (str): Local path of the audio file to be transcribed. """ client = OpenAI() try: # Check if the file exists if not os.path.exists(file_path): return f"Error: file not found at {file_path}" # Transcribe the audio with open(file_path, "rb") as file: transcription = client.audio.transcriptions.create( model="gpt-4o-mini-transcribe", file=file ) print(f"Transcription result: {transcription['text']}") return transcription["text"] except Exception as e: return f"Error during transcription: {str(e)}" @tool def web_search(query: str) -> str: """ Search Tavily for a query and return formatted results. Args: query (str): The search query. Returns: str: A formatted string with the search results. """ try: search_tool = TavilySearch(max_results=3, topic="general") search_response = search_tool.invoke(input=query) # Check if the response contains results if search_response and "results" in search_response: results = search_response["results"] formatted_results = "\n\n---\n\n".join( [ f"Title: {result['title']}\nURL: {result['url']}\nContent: {result['content']}" for result in results ] ) return formatted_results else: return "No results found." except Exception as e: print(f"Error during web search: {str(e)}") return f"Error during web search: {str(e)}" @tool def arvix_search(query: str) -> str: """ Search Arxiv for a query and return maximum 3 results. Args: query: The search query. """ try: search_docs = ArxivLoader(query=query, load_max_docs=3).load() return "\n\n---\n\n".join( [ f'\n{doc.page_content[:1000]}\n' for doc in search_docs ] ) except Exception as e: return f"Error during Arxiv search: {str(e)}" @tool def save_and_read_file(content: str, filename: Optional[str] = None) -> str: """ Save content to a file and return the path. Args: content (str): the content to save to the file filename (str, optional): the name of the file. If not provided, a random name file will be created. """ temp_dir = tempfile.gettempdir() if filename is None: temp_file = tempfile.NamedTemporaryFile(delete=False, dir=temp_dir) filepath = temp_file.name else: filepath = os.path.join(temp_dir, filename) with open(filepath, "w") as f: f.write(content) return f"File saved to {filepath}. You can read this file to process its contents." @tool def read_file(file_path: str) -> str: """ Return the raw text of a local file. Args: file_path (str): Local path of the file to be read. """ try: with open(file_path, "r", encoding="utf‑8", errors="ignore") as f: return f.read() except Exception as e: return f"Error reading {file_path}: {e}" @tool def download_file_from_url(url: str, filename: Optional[str] = None) -> str: """ Download a file from a URL and save it to a temporary location. Args: url (str): the URL of the file to download. filename (str, optional): the name of the file. If not provided, a random name file will be created. """ try: # Parse URL to get filename if not provided if not filename: path = urlparse(url).path filename = os.path.basename(path) if not filename: filename = f"downloaded_{uuid.uuid4().hex[:8]}" # Create temporary file temp_dir = tempfile.gettempdir() filepath = os.path.join(temp_dir, filename) # Download the file response = requests.get(url, stream=True) response.raise_for_status() # Save the file with open(filepath, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return f"File downloaded to {filepath}. You can read this file to process its contents." except Exception as e: return f"Error downloading file: {str(e)}" tools = [ execute_python_code, speech_to_text, web_search, #arvix_search, #wiki_search read_file, save_and_read_file, download_file_from_url ] # State class DummyState(TypedDict): messages: Annotated[list[AnyMessage], add_messages] # Agent class ReActAgent: def __init__( self, system_prompt_path: str = "prompts/system_prompt.txt", model: str = "gpt-4o" ) -> None: # Prompt self.system_prompt = self.read_system_prompt(system_prompt_path) # Initialize the LLM with tools self.llm = ChatOpenAI( model=model, temperature=0 ).bind_tools(tools) # Create a state graph self.compiled_graph = ( StateGraph(DummyState) .add_node("llm", RunnableLambda(self.llm_response)) .add_node("tools", ToolNode(tools)) .add_edge(START, "llm") .add_conditional_edges( "llm", # If the latest message (result) from assistant is a tool call -> tools_condition routes to tools # If the latest message (result) from assistant is a not a tool call -> tools_condition routes to END tools_condition, ) .add_edge("tools", "llm") .compile() ) def read_system_prompt(self, path:str) -> str: with open(path, "r") as file: return file.read() def llm_response(self, state: DummyState) -> dict: print("LLM node called with state:", state) response = self.llm.invoke(state["messages"]) return {"messages": [response]} def extract_final_answer(self, response: str) -> str: if "FINAL ANSWER:" in response: answer = response.split("FINAL ANSWER:")[1].strip().replace(".", "") else: # fallback if model did not follow instruction perfectly answer = response.strip() # Remove trailing period, but only at the end if answer.endswith("."): answer = answer[:-1].strip() return answer def __call__( self, question: str, #file_path: str=None ) -> str: inputs = { "messages": [ SystemMessage(content=self.system_prompt), HumanMessage(content=question) ] } # Add file path if available #inputs["file_path"] = file_path or None # type: ignore # Run the graph with the inputs result = self.compiled_graph.invoke( inputs, config={ "configurable": {"thread_id": "benchmark-test"} } ) final_msg = result["messages"][-1].content return self.extract_final_answer(final_msg)