sqfoo's picture
Update agent.py
ea366ea verified
raw
history blame
14.2 kB
import os
from dotenv import load_dotenv
from typing import TypedDict, List, Dict, Any, Optional
from langgraph.graph import StateGraph, START, END, MessagesState
from langchain.agents import create_tool_calling_agent, AgentExecutor, initialize_agent, create_react_agent
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_groq import ChatGroq
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate, PromptTemplate
from langgraph.prebuilt import ToolNode
from langgraph.prebuilt import tools_condition
# 1. Web Browsing
from langchain_community.tools import DuckDuckGoSearchResults
from langchain_community.document_loaders import ImageCaptionLoader
import requests, time
import pandas as pd
from pathlib import Path
from langchain_community.tools import WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper
from langchain_community.document_loaders import YoutubeLoader
from langchain_community.document_loaders import UnstructuredExcelLoader
from langchain_community.document_loaders import AssemblyAIAudioTranscriptLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain_community.utilities import GoogleSerperAPIWrapper
load_dotenv()
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
@tool
def duckduck_websearch(query: str) -> str:
"""Allows search through DuckDuckGo.
Args:
query: what you want to search
"""
search = DuckDuckGoSearchResults()
results = search.invoke(query)
return "\n".join(results)
@tool
def serper_websearch(query: str) -> str:
"""Allows search through Serper.
Args:
query: what you want to search
"""
search = GoogleSerperAPIWrapper(serper_api_key=os.getenv("SERPER_API_KEY"))
results = search.run(query)
return results
@tool
def visit_webpage(url: str) -> str:
"""Fetches raw HTML content of a web page.
Args:
url: the webpage url
"""
try:
response = requests.get(url, timeout=5)
return response.text[:5000]
except Exception as e:
return f"[ERROR fetching {url}]: {str(e)}"
@tool
def wiki_search(query: str) -> str:
"""Wiki search tools.
Args:
query: what you want to wiki
"""
api_wrapper = WikipediaAPIWrapper(top_k_results=1, doc_content_chars_max=100)
wikipediatool = WikipediaQueryRun(api_wrapper=api_wrapper)
return wikipediatool.run({"query": query})
@tool
def text_splitter(text: str) -> List[str]:
"""Splits text into chunks using LangChain's CharacterTextSplitter.
Args:
text: A string of text to split.
"""
splitter = CharacterTextSplitter(chunk_size=450, chunk_overlap=10)
return splitter.split_text(text)
@tool
def youtube_transcript(video_url: str) -> str:
"""Fetched youtube transcript
Args:
video_url: YouTube video url
"""
try:
loader = YoutubeLoader.from_youtube_url(video_url)
# video_id = video_url.split("v=")[-1].split("&")[0]
# transcript = YouTubeTranscriptApi.get_transcript(video_id)
return loader.load()
except Exception as e:
return f"Error fetching transcript: {str(e)}"
# 4. File Reading
@tool
def read_file(task_id: str) -> str:
"""First download the file, then read its content
Args:
dir: the task_id
"""
file_url = f'{DEFAULT_API_URL}/files/{task_id}'
r = requests.get(file_url, timeout=15, allow_redirects=True)
with open('temp', "wb") as fp:
fp.write(r.content)
with open('temp') as f:
return f.read()
@tool
def excel_read(task_id: str) -> str:
"""First download the excel file, then read its content
Args:
dir: the task_id
"""
try:
file_url = f'{DEFAULT_API_URL}/files/{task_id}'
r = requests.get(file_url, timeout=15, allow_redirects=True)
with open('temp.xlsx', "wb") as fp:
fp.write(r.content)
# Read the Excel file
df = pd.read_excel('temp.xlsx')
# Run various analyses based on the query
result = (
f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n"
)
result += f"Columns: {', '.join(df.columns)}\n\n"
# Add summary statistics
result += "Summary statistics:\n"
result += str(df.describe())
return result
except Exception as e:
return f"Error analyzing Excel file: {str(e)}"
@tool
def csv_read(task_id: str) -> str:
"""First download the csv file, then read its content
Args:
dir: the task_id
"""
try:
file_url = f'{DEFAULT_API_URL}/files/{task_id}'
r = requests.get(file_url, timeout=15, allow_redirects=True)
with open('temp.csv', "wb") as fp:
fp.write(r.content)
# Read the CSV file
df = pd.read_csv(temp.csv)
# Run various analyses based on the query
result = (
f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n"
)
result += f"Columns: {', '.join(df.columns)}\n\n"
# Add summary statistics
result += "Summary statistics:\n"
result += str(df.describe())
return result
except Exception as e:
return f"Error analyzing CSV file: {str(e)}"
@tool
def mp3_listen(task_id: str) -> str:
"""First download the mp3 file, then listen to it
Args:
dir: the task_id
"""
file_url = f'{DEFAULT_API_URL}/files/{task_id}'
r = requests.get(file_url, timeout=15, allow_redirects=True)
with open('temp.mp3', "wb") as fp:
fp.write(r.content)
loader = AssemblyAIAudioTranscriptLoader(file_path="temp.mp3", api_key=os.getenv("AssemblyAI_API_KEY"))
docs = loader.load()
contents = [doc.page_content for doc in docs]
return "\n".join(contents)
# 5. Image Open
@tool
def image_caption(dir: str) -> str:
"""Understand the content of the provided image
Args:
dir: the image url link
"""
loader = ImageCaptionLoader(images=[dir])
metadata = loader.load()
return metadata[0].page_content
# 2. Coding
from langchain_experimental.tools import PythonREPLTool
@tool
def run_python(code: str):
""" Run the given python code
Args:
code: the python code
"""
return PythonREPLTool().run(code)
@tool
def multiply(a: float, b: float) -> float:
"""Multiply two numbers.
Args:
a: first float
b: second float
"""
return a * b
@tool
def add(a: float, b: float) -> float:
"""Add two numbers.
Args:
a: first float
b: second float
"""
return a + b
@tool
def subtract(a: float, b: float) -> float:
"""Subtract two numbers.
Args:
a: first float
b: second float
"""
return a - b
@tool
def divide(a: float, b: float) -> float:
"""Divide two numbers.
Args:
a: first float
b: second float
"""
if b == 0:
raise ValueError("Cannot divide by zero.")
return a / b
# 3. Multi-Modality
# - multiply: multiply two numbers, A and B
# - add: add two numbers, A and B
# - subtract: Subtract A by B with passing A as the first argument
# - divide: Divide A by B with passing A as the first argument
# You have access to the following tools:
# - serper_websearch: web search the content of the query by passing the query as input with Serper Search Engine
# - duckduck_websearch: web search the content of the query by passing the query as input with DuckDuckGo Search Engine
# - visit_webpage: visit the given webpage url by passing the url as input
# - wiki_search: wiki search the content of the query by passing the query as input if the question asks for wiki search it
# - text_splitter: split text into chunks
# - youtube_transcript: fetch the transcript of the Youtube video by passing the video url as input if the question asks for watching a Youtube video
# - read_file: read the content of the attached file by passing the TASK-ID as input
# - excel_read: read the content of the attached excel file by passing the TASK-ID as input
# - csv_read: read the content of the attached csv file by passing the TASK-ID as input
# - mp3_listen: listen to the content of the attached mp3 file by passing the TASK-ID as input
# - image_caption: understand the visual content of the attached image by passing the TASK-ID as input
# - run_python: run the python code
# ("human", f"Question: {question}\nReport to validate: {final_answer}")
class BasicAgent:
def __init__(self):
self.model = ChatGoogleGenerativeAI(
model="gemini-2.0-flash-lite",
temperature=0,
max_tokens=128,
timeout=None,
max_retries=2,
google_api_key=os.getenv("GEMINI_API_KEY"),
# other params...
)
# self.model = ChatGroq(
# model="qwen-qwq-32b",
# temperature=0,
# max_tokens=128,
# timeout=None,
# max_retries=2,
# groq_api_key=os.getenv("GROQ_API_KEY")
# # other params...
# )
# System Prompt for few shot prompting
self.sys_prompt = """"
You are a general AI assistant. I will ask you a question. Report your thoughts, and finish your answer with the following template:
FINAL ANSWER: [YOUR FINAL ANSWER].
YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma separared list of numbers and/or strings.
If you are asked for a number, don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise.
If you are asked for a string, don't use articles, neither abbreviations (eg. for cities), and write the digits in plain text unless specified otherwise.
If you are asked for a comma separated list, apply the above rules depending of whether the element to put in the list is a number or a string.
You have access to the following tools:
{tools}
Here are the tools you can use: {tool_names}
If Task ID is included in the question, remember to call the relevant read tools [ie. read_file, excel_read, csv_read, mp3_listen, image_caption]
Note: python_tool is called when the question mentions the term "Python" or any math calculation.
Follow this format in your response:
THOUGHT: [Describe your reasoning here]
ACTION: [Specify the action/tool to use and any relevant input]
OBSERVATIOn: [Result of the action/tool, provided by the system]
FINAL ANSWER: [Provide your final response to the user]
User Input: {input}
{agent_scratchpad}
"""
self.tools = [duckduck_websearch, serper_websearch, visit_webpage, wiki_search, text_splitter, youtube_transcript, read_file, excel_read, csv_read, mp3_listen, image_caption, run_python]
# self.model_with_tools = self.model.bind_tools(self.tools)
# self.sys_msg = SystemMessage(content=self.sys_prompt)
# self.prompt = ChatPromptTemplate.from_messages([
# ("system", self.sys_prompt),
# ("human", "{input}")
# ])
self.prompt = PromptTemplate(
input_variables=["input", "tools", "tool_names", "agent_scratchpad"],
template=self.sys_prompt
)
# self.agent = initialize_agent(
# tools=self.tools,
# llm=self.model,
# agent="zero-shot-react-description", # ReAct agent type
# verbose=True,
# system_prompt=self.prompt,
# handle_parsing_errors="Check your output and make sure it conforms, use the Action/Action Input syntax"
# )
self.agent = create_react_agent(
llm=self.model,
tools=self.tools,
prompt=self.prompt
)
self.agent_exe = AgentExecutor(agent=self.agent, tools=self.tools, verbose=True,
handle_parsing_errors="Check your output and make sure it conforms, use the Action/Action Input syntax")
# self.graph = self.__graph_compile__()
print("BasicAgent initialized.")
def __call__(self, task: dict) -> str:
task_id, question, file_name = task["task_id"], task["question"], task["file_name"]
print(f"Agent received question (first 50 chars): {question[:50]}...")
if file_name == "" or file_name is None:
question = question
else:
question = f"{question} with TASK-ID: {task_id}"
# fixed_answer = self.agent.run(f'{question} with TASK-ID: {task_id}')
# fixed_answer = "This is a default answer."
# fixed_answer = self.agent.run(question)
fixed_answer = self.agent_exe.invoke({"input": question})
# human_message = [HumanMessage(content=question)]
# messages = self.graph.invoke({"messages": human_message})
# fixed_answer = messages['messages'][-1].content
print(f"Agent returning fixed answer: {fixed_answer}")
time.sleep(60)
return fixed_answer
def __graph_compile__(self):
def assistant(state: MessagesState):
"""Assistant Node"""
return {"message": [self.model_with_tools.invoke(state["messages"])]}
builder = StateGraph(MessagesState)
builder.add_node("assistant", assistant)
builder.add_node("tools", ToolNode(self.tools))
builder.add_edge(START, "assistant")
builder.add_conditional_edges(
"assistant",
# If the latest message (result) from assistant is a tool call -> tools_condition routes to tools
# If the latest message (result) from assistant is a not a tool call -> tools_condition routes to END
tools_condition,
)
builder.add_edge("tools", "assistant")
# Compile graph
return builder.compile()