tsrrus's picture
Update agent.py
9a4d5ce verified
import os
import pandas as pd
import numpy as np
from dotenv import load_dotenv
from langgraph.graph import START, StateGraph, MessagesState
from langgraph.prebuilt import tools_condition
from langgraph.prebuilt import ToolNode
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_groq import ChatGroq
from langchain_huggingface import (
ChatHuggingFace,
HuggingFaceEndpoint,
HuggingFaceEmbeddings,
)
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_community.document_loaders import WikipediaLoader
from langchain_community.document_loaders import ArxivLoader
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from langchain_core.tools import tool
from sklearn.metrics.pairwise import cosine_similarity
import ast
load_dotenv()
@tool
def multiply(a: int, b: int) -> int:
"""Multiply two numbers.
Args:
a: first int
b: second int
"""
return a * b
@tool
def add(a: int, b: int) -> int:
"""Add two numbers.
Args:
a: first int
b: second int
"""
return a + b
@tool
def subtract(a: int, b: int) -> int:
"""Subtract two numbers.
Args:
a: first int
b: second int
"""
return a - b
@tool
def divide(a: int, b: int) -> int:
"""Divide two numbers.
Args:
a: first int
b: second int
"""
if b == 0:
raise ValueError("Cannot divide by zero.")
return a / b
@tool
def modulus(a: int, b: int) -> int:
"""Get the modulus of two numbers.
Args:
a: first int
b: second int
"""
return a % b
@tool
def wiki_search(query: str) -> str:
"""Search Wikipedia for a query and return maximum 2 results.
Args:
query: The search query."""
search_docs = WikipediaLoader(query=query, load_max_docs=2).load()
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>'
for doc in search_docs
]
)
return {"wiki_results": formatted_search_docs}
@tool
def web_search(query: str) -> str:
"""Search Tavily for a query and return maximum 3 results.
Args:
query: The search query."""
search_docs = TavilySearchResults(max_results=3).invoke(query=query)
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>'
for doc in search_docs
]
)
return {"web_results": formatted_search_docs}
@tool
def arvix_search(query: str) -> str:
"""Search Arxiv for a query and return maximum 3 result.
Args:
query: The search query."""
search_docs = ArxivLoader(query=query, load_max_docs=3).load()
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>'
for doc in search_docs
]
)
return {"arvix_results": formatted_search_docs}
# Load CSV data and embeddings
class LocalCSVRetriever:
def __init__(self, csv_file_path="supabase_docs.csv"):
self.csv_file_path = csv_file_path
self.df = None
self.embeddings_model = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-mpnet-base-v2"
)
self.load_data()
def load_data(self):
"""Load data from CSV file"""
try:
self.df = pd.read_csv(self.csv_file_path)
print(f"Loaded {len(self.df)} documents from {self.csv_file_path}")
# Convert string representation of embeddings back to numpy arrays
if 'embedding' in self.df.columns:
self.df['embedding_array'] = self.df['embedding'].apply(
lambda x: np.array(ast.literal_eval(x)) if isinstance(x, str) else np.array(x)
)
except FileNotFoundError:
print(f"CSV file {self.csv_file_path} not found!")
self.df = pd.DataFrame()
except Exception as e:
print(f"Error loading CSV: {e}")
self.df = pd.DataFrame()
def similarity_search(self, query: str, k: int = 1):
"""Perform similarity search on local data"""
if self.df.empty:
return []
# Get query embedding
query_embedding = self.embeddings_model.embed_query(query)
query_embedding = np.array(query_embedding).reshape(1, -1)
# Calculate similarities
similarities = []
for idx, row in self.df.iterrows():
doc_embedding = row['embedding_array'].reshape(1, -1)
similarity = cosine_similarity(query_embedding, doc_embedding)[0][0]
similarities.append((idx, similarity, row['content']))
# Sort by similarity and return top k
similarities.sort(key=lambda x: x[1], reverse=True)
# Create simple document-like objects
results = []
for i in range(min(k, len(similarities))):
idx, sim_score, content = similarities[i]
# Create a simple object with page_content attribute
doc = type('Document', (), {
'page_content': content,
'metadata': ast.literal_eval(self.df.iloc[idx]['metadata']) if isinstance(self.df.iloc[idx]['metadata'], str) else self.df.iloc[idx]['metadata']
})()
results.append(doc)
return results
# Initialize the local retriever
local_retriever = LocalCSVRetriever()
# load the system prompt from the file
with open("system_prompt.txt", "r", encoding="utf-8") as f:
system_prompt = f.read()
# System message
sys_msg = SystemMessage(content=system_prompt)
tools = [
multiply,
add,
subtract,
divide,
modulus,
wiki_search,
web_search,
arvix_search,
]
# Build graph function
def build_graph(provider: str = "groq"):
"""Build the graph"""
# Load environment variables from .env file
if provider == "google":
# Google Gemini
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0)
elif provider == "groq":
# Groq https://console.groq.com/docs/models
llm = ChatGroq(
model="qwen-qwq-32b", temperature=0
) # optional : qwen-qwq-32b gemma2-9b-it
elif provider == "huggingface":
# TODO: Add huggingface endpoint
llm = ChatHuggingFace(
llm=HuggingFaceEndpoint(
url="https://api-inference.huggingface.co/models/Meta-DeepLearning/llama-2-7b-chat-hf",
temperature=0,
),
)
else:
raise ValueError("Invalid provider. Choose 'google', 'groq' or 'huggingface'.")
# Bind tools to LLM
llm_with_tools = llm.bind_tools(tools)
# Node
def assistant(state: MessagesState):
"""Assistant node"""
return {"messages": [llm_with_tools.invoke(state["messages"])]}
def retriever(state: MessagesState):
"""Modified retriever to use local CSV data"""
query = state["messages"][-1].content
similar_docs = local_retriever.similarity_search(query, k=1)
# Handle empty results
if not similar_docs:
return {
"messages": [
AIMessage(
content="I don't have information about this topic in my knowledge base. Please try a different question."
)
]
}
similar_doc = similar_docs[0]
content = similar_doc.page_content
if "Final answer :" in content:
answer = content.split("Final answer :")[-1].strip()
else:
answer = content.strip()
# Ensure answer is not empty
if not answer:
answer = "I found related information but couldn't extract a clear answer. Please rephrase your question."
return {"messages": [AIMessage(content=answer)]}
builder = StateGraph(MessagesState)
builder.add_node("retriever", retriever)
# Retriever ist Start und Endpunkt
builder.set_entry_point("retriever")
builder.set_finish_point("retriever")
# Compile graph
return builder.compile()