AgentCourseFinals / agents /coding_agent.py
Basti-1995's picture
added CodeActAgent for testing
bf21663
import builtins
import contextlib
import io
from typing import Any
from langchain.chat_models import init_chat_model
from langgraph_codeact import create_codeact
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.tools import tool
from langchain_community.document_loaders import WikipediaLoader, ArxivLoader
from openai import OpenAI # audio
import os
import requests
import subprocess
from typing import Optional, Any
import tempfile
from urllib.parse import urlparse
import uuid
from langchain_tavily import TavilySearch
from pathlib import Path # new import
def eval(code: str, _locals: dict[str, Any]) -> tuple[str, dict[str, Any]]:
# Store original keys before execution
original_keys = set(_locals.keys())
try:
with contextlib.redirect_stdout(io.StringIO()) as f:
exec(code, builtins.__dict__, _locals)
result = f.getvalue() or "<code ran, no output printed to stdout>"
except Exception as e:
result = f"Error during execution: {repr(e)}"
# Determine new variables created during execution
new_keys = set(_locals.keys()) - original_keys
new_vars = {key: _locals[key] for key in new_keys}
return result, new_vars
# Tools
@tool
def execute_python_code(code_path: str) -> str:
"""
Execute a Python script and return the final output or error.
Args:
code_path (str): the path to the Python file to be executed
"""
try:
if not os.path.exists(code_path):
return f"Error: file not found at {code_path}"
# Execute the Python file and capture output
result = subprocess.run(
['python', code_path],
capture_output=True,
text=True,
check=True
)
return result.stdout
except subprocess.CalledProcessError as e:
# Capture any error that occurs during execution
return f"Execution error: {e.stderr}"
except Exception as e:
return f"Unexpected error: {str(e)}"
#@tool
#def speech_to_text(file_path: str) -> str:
# """
# Transcribe an audio file from a local path to text.
# Args:
# file_path (str): Local path of the audio file to be transcribed.
# """
# client = OpenAI()
# try:
# Check if the file exists
# if not os.path.exists(file_path):
# return f"Error: file not found at {file_path}"
# Step 2: Transcribe the audio
# with open(file_path, "rb") as file:
# transcription = client.audio.transcriptions.create(
# model="gpt-4o-mini-transcribe",
# file=file
# )
# print(f"Transcription result: {transcription['text']}")
# return transcription["text"]
# except Exception as e:
# return f"Error during transcription: {str(e)}"
@tool
def speech_to_text(file_path: str) -> str:
"""
Transcribe an audio file from a local path to text.
Args:
file_path (str): Local path of the audio file to be transcribed.
"""
client = OpenAI()
try:
# Check if the file exists
if not os.path.exists(file_path):
return f"Error: file not found at {file_path}"
# Transcribe the audio
with open(file_path, "rb") as file:
transcription = client.audio.transcriptions.create(
model="gpt-4o-mini-transcribe",
file=file
)
print(f"Transcription result: {transcription['text']}")
return transcription["text"]
except Exception as e:
return f"Error during transcription: {str(e)}"
@tool
def web_search(query: str) -> str:
"""
Search Tavily for a query and return formatted results.
Args:
query (str): The search query.
Returns:
str: A formatted string with the search results.
"""
try:
search_tool = TavilySearch(max_results=3, topic="general")
search_response = search_tool.invoke(input=query)
# Check if the response contains results
if search_response and "results" in search_response:
results = search_response["results"]
return "\n\n---\n\n".join(
[
f"Title: {result['title']}\nURL: {result['url']}\nContent: {result['content']}"
for result in results
]
)
else:
return "No results found."
except Exception as e:
print(f"Error during web search: {str(e)}")
return f"Error during web search: {str(e)}"
@tool
def arvix_search(query: str) -> str:
"""
Search Arxiv for a query and return maximum 3 results.
Args:
query: The search query.
"""
try:
search_docs = ArxivLoader(query=query, load_max_docs=3).load()
return "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata.get("source", "Unknown")}" page="{doc.metadata.get("page", "N/A")}"/>\n{doc.page_content[:1000]}\n</Document>'
for doc in search_docs
]
)
except Exception as e:
return f"Error during Arxiv search: {str(e)}"
@tool
def save_and_read_file(content: str, filename: Optional[str] = None) -> str:
"""
Save content to a file and return the path.
Args:
content (str): the content to save to the file
filename (str, optional): the name of the file. If not provided, a random name file will be created.
"""
temp_dir = tempfile.gettempdir()
if filename is None:
temp_file = tempfile.NamedTemporaryFile(delete=False, dir=temp_dir)
filepath = temp_file.name
else:
filepath = os.path.join(temp_dir, filename)
with open(filepath, "w") as f:
f.write(content)
return f"File saved to {filepath}. You can read this file to process its contents."
@tool
def read_file(file_path: str) -> str:
"""
Return the raw text of a local file.
Args:
file_path (str): Local path of the file to be read.
"""
try:
with open(file_path, "r", encoding="utf‑8", errors="ignore") as f:
return f.read()
except Exception as e:
return f"Error reading {file_path}: {e}"
@tool
def download_file_from_url(url: str, filename: Optional[str] = None) -> str:
"""
Download a file from a URL and save it to a temporary location.
Args:
url (str): the URL of the file to download.
filename (str, optional): the name of the file. If not provided, a random name file will be created.
"""
try:
# Parse URL to get filename if not provided
if not filename:
path = urlparse(url).path
filename = os.path.basename(path)
if not filename:
filename = f"downloaded_{uuid.uuid4().hex[:8]}"
# Create temporary file
temp_dir = tempfile.gettempdir()
filepath = os.path.join(temp_dir, filename)
# Download the file
response = requests.get(url, stream=True)
response.raise_for_status()
# Save the file
with open(filepath, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return f"File downloaded to {filepath}. You can read this file to process its contents."
except Exception as e:
return f"Error downloading file: {str(e)}"
tools = [
#execute_python_code,
#speech_to_text,
web_search,
#arvix_search,
#wiki_search
#read_file,
#save_and_read_file,
#download_file_from_url
]
class CodeActAgent:
def __init__(
self,
system_prompt_path: str = "prompts/system_prompt.txt",
model: str = "gpt-4o"
) -> None:
self.system_prompt = Path(system_prompt_path).read_text(encoding="utf-8")#self.read_system_prompt(system_prompt_path)
self.llm = init_chat_model(model, model_provider="openai")
self.compiled_agent = create_codeact(
self.llm,
tools,
eval
).compile()
def read_system_prompt(self, path:str) -> str:
with open(path, "r") as file:
return file.read()
def extract_final_answer(self, response: str) -> str:
if "FINAL ANSWER:" in response:
answer = response.split("FINAL ANSWER:")[1].strip().replace(".", "")
else:
# fallback if model did not follow instruction perfectly
answer = response.strip()
# Remove trailing period, but only at the end
if answer.endswith("."):
answer = answer[:-1].strip()
return answer
def __call__(
self, question: str,
#file_path: str=None
) -> str:
inputs = {
"messages": [
SystemMessage(content=self.system_prompt),
HumanMessage(content=question)
]
}
# Add file path if available
#inputs["file_path"] = file_path or None # type: ignore
# Run the graph with the inputs
result = self.compiled_agent.invoke(
inputs,
config={
"configurable": {"thread_id": "benchmark-test"}
}
)
final_msg = result["messages"][-1].content
return self.extract_final_answer(final_msg)