AgentCourseFinals / agents /tool_calling_agent.py
Basti-1995's picture
added CodeActAgent for testing
bf21663
from langgraph.graph import StateGraph
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.runnables import RunnableLambda
from langchain_core.tools import tool
from langchain_community.document_loaders import WikipediaLoader, ArxivLoader
from langchain_openai import ChatOpenAI
from openai import OpenAI # audio
import os
import requests
import subprocess
from typing import TypedDict, Annotated, Optional, List, Dict, Any
import tempfile
from urllib.parse import urlparse
import uuid
from langgraph.graph.message import AnyMessage, add_messages
from langchain_tavily import TavilySearch
from langgraph.prebuilt import tools_condition
from langgraph.prebuilt import ToolNode
from langgraph.graph import START
# Tools
@tool
def execute_python_code(code_path: str) -> str:
"""
Execute a Python script and return the final output or error.
Args:
code_path (str): the path to the Python file to be executed
"""
try:
if not os.path.exists(code_path):
return f"Error: file not found at {code_path}"
# Execute the Python file and capture output
result = subprocess.run(
['python', code_path],
capture_output=True,
text=True,
check=True
)
return result.stdout
except subprocess.CalledProcessError as e:
# Capture any error that occurs during execution
return f"Execution error: {e.stderr}"
except Exception as e:
return f"Unexpected error: {str(e)}"
#@tool
#def speech_to_text(file_path: str) -> str:
# """
# Transcribe an audio file from a local path to text.
# Args:
# file_path (str): Local path of the audio file to be transcribed.
# """
# client = OpenAI()
# try:
# Check if the file exists
# if not os.path.exists(file_path):
# return f"Error: file not found at {file_path}"
# Step 2: Transcribe the audio
# with open(file_path, "rb") as file:
# transcription = client.audio.transcriptions.create(
# model="gpt-4o-mini-transcribe",
# file=file
# )
# print(f"Transcription result: {transcription['text']}")
# return transcription["text"]
# except Exception as e:
# return f"Error during transcription: {str(e)}"
@tool
def speech_to_text(file_path: str) -> str:
"""
Transcribe an audio file from a local path to text.
Args:
file_path (str): Local path of the audio file to be transcribed.
"""
client = OpenAI()
try:
# Check if the file exists
if not os.path.exists(file_path):
return f"Error: file not found at {file_path}"
# Transcribe the audio
with open(file_path, "rb") as file:
transcription = client.audio.transcriptions.create(
model="gpt-4o-mini-transcribe",
file=file
)
print(f"Transcription result: {transcription['text']}")
return transcription["text"]
except Exception as e:
return f"Error during transcription: {str(e)}"
@tool
def web_search(query: str) -> str:
"""
Search Tavily for a query and return formatted results.
Args:
query (str): The search query.
Returns:
str: A formatted string with the search results.
"""
try:
search_tool = TavilySearch(max_results=3, topic="general")
search_response = search_tool.invoke(input=query)
# Check if the response contains results
if search_response and "results" in search_response:
results = search_response["results"]
formatted_results = "\n\n---\n\n".join(
[
f"Title: {result['title']}\nURL: {result['url']}\nContent: {result['content']}"
for result in results
]
)
return formatted_results
else:
return "No results found."
except Exception as e:
print(f"Error during web search: {str(e)}")
return f"Error during web search: {str(e)}"
@tool
def arvix_search(query: str) -> str:
"""
Search Arxiv for a query and return maximum 3 results.
Args:
query: The search query.
"""
try:
search_docs = ArxivLoader(query=query, load_max_docs=3).load()
return "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata.get("source", "Unknown")}" page="{doc.metadata.get("page", "N/A")}"/>\n{doc.page_content[:1000]}\n</Document>'
for doc in search_docs
]
)
except Exception as e:
return f"Error during Arxiv search: {str(e)}"
@tool
def save_and_read_file(content: str, filename: Optional[str] = None) -> str:
"""
Save content to a file and return the path.
Args:
content (str): the content to save to the file
filename (str, optional): the name of the file. If not provided, a random name file will be created.
"""
temp_dir = tempfile.gettempdir()
if filename is None:
temp_file = tempfile.NamedTemporaryFile(delete=False, dir=temp_dir)
filepath = temp_file.name
else:
filepath = os.path.join(temp_dir, filename)
with open(filepath, "w") as f:
f.write(content)
return f"File saved to {filepath}. You can read this file to process its contents."
@tool
def read_file(file_path: str) -> str:
"""
Return the raw text of a local file.
Args:
file_path (str): Local path of the file to be read.
"""
try:
with open(file_path, "r", encoding="utf‑8", errors="ignore") as f:
return f.read()
except Exception as e:
return f"Error reading {file_path}: {e}"
@tool
def download_file_from_url(url: str, filename: Optional[str] = None) -> str:
"""
Download a file from a URL and save it to a temporary location.
Args:
url (str): the URL of the file to download.
filename (str, optional): the name of the file. If not provided, a random name file will be created.
"""
try:
# Parse URL to get filename if not provided
if not filename:
path = urlparse(url).path
filename = os.path.basename(path)
if not filename:
filename = f"downloaded_{uuid.uuid4().hex[:8]}"
# Create temporary file
temp_dir = tempfile.gettempdir()
filepath = os.path.join(temp_dir, filename)
# Download the file
response = requests.get(url, stream=True)
response.raise_for_status()
# Save the file
with open(filepath, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return f"File downloaded to {filepath}. You can read this file to process its contents."
except Exception as e:
return f"Error downloading file: {str(e)}"
tools = [
execute_python_code,
speech_to_text,
web_search,
#arvix_search,
#wiki_search
read_file,
save_and_read_file,
download_file_from_url
]
# State
class DummyState(TypedDict):
messages: Annotated[list[AnyMessage], add_messages]
# Agent
class ReActAgent:
def __init__(
self,
system_prompt_path: str = "prompts/system_prompt.txt",
model: str = "gpt-4o"
) -> None:
# Prompt
self.system_prompt = self.read_system_prompt(system_prompt_path)
# Initialize the LLM with tools
self.llm = ChatOpenAI(
model=model,
temperature=0
).bind_tools(tools)
# Create a state graph
self.compiled_graph = (
StateGraph(DummyState)
.add_node("llm", RunnableLambda(self.llm_response))
.add_node("tools", ToolNode(tools))
.add_edge(START, "llm")
.add_conditional_edges(
"llm",
# If the latest message (result) from assistant is a tool call -> tools_condition routes to tools
# If the latest message (result) from assistant is a not a tool call -> tools_condition routes to END
tools_condition,
)
.add_edge("tools", "llm")
.compile()
)
def read_system_prompt(self, path:str) -> str:
with open(path, "r") as file:
return file.read()
def llm_response(self, state: DummyState) -> dict:
print("LLM node called with state:", state)
response = self.llm.invoke(state["messages"])
return {"messages": [response]}
def extract_final_answer(self, response: str) -> str:
if "FINAL ANSWER:" in response:
answer = response.split("FINAL ANSWER:")[1].strip().replace(".", "")
else:
# fallback if model did not follow instruction perfectly
answer = response.strip()
# Remove trailing period, but only at the end
if answer.endswith("."):
answer = answer[:-1].strip()
return answer
def __call__(
self, question: str,
#file_path: str=None
) -> str:
inputs = {
"messages": [
SystemMessage(content=self.system_prompt),
HumanMessage(content=question)
]
}
# Add file path if available
#inputs["file_path"] = file_path or None # type: ignore
# Run the graph with the inputs
result = self.compiled_graph.invoke(
inputs,
config={
"configurable": {"thread_id": "benchmark-test"}
}
)
final_msg = result["messages"][-1].content
return self.extract_final_answer(final_msg)