agent / agent.py
rohittayde's picture
Update agent.py
ade0954 verified
"""LangGraph Agent (patched for robustness)"""
import os
import traceback
from dotenv import load_dotenv
from langgraph.graph import START, StateGraph, MessagesState
from langgraph.prebuilt import tools_condition
from langgraph.prebuilt import ToolNode
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_groq import ChatGroq
from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint, HuggingFaceEmbeddings
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_community.document_loaders import WikipediaLoader
from langchain_community.document_loaders import ArxivLoader
from langchain_community.vectorstores import SupabaseVectorStore
from langchain_core.messages import SystemMessage, HumanMessage
from langchain_core.tools import tool
from supabase.client import Client, create_client
# --- Safe import + fallback for langchain.tools.retriever.create_retriever_tool ---
try:
# Try to import the real helper (if the installed langchain provides it)
from langchain.tools.retriever import create_retriever_tool # type: ignore
HAS_CREATE_RETRIEVER_TOOL = True
except Exception:
HAS_CREATE_RETRIEVER_TOOL = False
print("Warning: langchain.tools.retriever.create_retriever_tool not found. Using local fallback.")
print(traceback.format_exc())
class _SimpleRetrieverTool:
"""
Minimal tool-like wrapper providing a `.run(query)` method.
Most templates call tool.run(query) — adapt if your code uses a different interface.
"""
def __init__(self, retriever, name="retriever", description=""):
self.name = name
self.description = description
self._retriever = retriever
def run(self, query: str):
# Try common retriever methods in order
docs = []
try:
if hasattr(self._retriever, "get_relevant_documents"):
docs = self._retriever.get_relevant_documents(query)
elif hasattr(self._retriever, "retrieve"):
docs = self._retriever.retrieve(query)
else:
# try calling the retriever directly (some callables return results)
docs = self._retriever(query)
except Exception as e:
return f"[retriever-fallback-error] {e}"
# Normalize docs into strings
out_texts = []
for d in docs or []:
text = getattr(d, "page_content", None)
if text is None:
if isinstance(d, dict):
text = d.get("page_content") or d.get("text") or str(d)
else:
text = str(d)
if text:
out_texts.append(text.strip())
# return compact result
return "\n\n".join(t for t in out_texts if t)
def create_retriever_tool(retriever, name: str = "retriever", description: str = ""):
"""
Minimal drop-in fallback returning an object with .run(query).
Replace with the real langchain helper later once you pin the package.
"""
return _SimpleRetrieverTool(retriever, name=name, description=description)
load_dotenv()
@tool
def multiply(a: int, b: int) -> int:
"""Multiply two numbers.
Args:
a: first int
b: second int
"""
return a * b
@tool
def add(a: int, b: int) -> int:
"""Add two numbers.
Args:
a: first int
b: second int
"""
return a + b
@tool
def subtract(a: int, b: int) -> int:
"""Subtract two numbers.
Args:
a: first int
b: second int
"""
return a - b
@tool
def divide(a: int, b: int) -> int:
"""Divide two numbers.
Args:
a: first int
b: second int
"""
if b == 0:
raise ValueError("Cannot divide by zero.")
return a / b
@tool
def modulus(a: int, b: int) -> int:
"""Get the modulus of two numbers.
Args:
a: first int
b: second int
"""
return a % b
@tool
def wiki_search(query: str) -> str:
"""Search Wikipedia for a query and return maximum 2 results.
Args:
query: The search query."""
try:
search_docs = WikipediaLoader(query=query, load_max_docs=2).load()
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata.get("source","")}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>'
for doc in search_docs
])
return {"wiki_results": formatted_search_docs}
except Exception as e:
return {"wiki_results_error": str(e)}
@tool
def web_search(query: str) -> str:
"""Search Tavily for a query and return maximum 3 results.
Args:
query: The search query."""
try:
search_docs = TavilySearchResults(max_results=3).invoke(query=query)
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata.get("source","")}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>'
for doc in search_docs
])
return {"web_results": formatted_search_docs}
except Exception as e:
return {"web_results_error": str(e)}
@tool
def arvix_search(query: str) -> str:
"""Search Arxiv for a query and return maximum 3 result.
Args:
query: The search query."""
try:
search_docs = ArxivLoader(query=query, load_max_docs=3).load()
formatted_search_docs = "\n\n---\n\n".join(
[
f'<Document source="{doc.metadata.get("source","")}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>'
for doc in search_docs
])
return {"arvix_results": formatted_search_docs}
except Exception as e:
return {"arvix_results_error": str(e)}
# load the system prompt from the file
with open("system_prompt.txt", "r", encoding="utf-8") as f:
system_prompt = f.read()
# System message
sys_msg = SystemMessage(content=system_prompt)
# --- Build a retriever (defensive: don't crash if heavy deps or credentials missing) ---
retriever_tool = None
vector_store = None
embeddings = None
# Try to create HuggingFaceEmbeddings and SupabaseVectorStore if dependencies and env are present.
try:
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2") # dim=768
except Exception as e:
print(f"⚠️ Could not initialize HuggingFaceEmbeddings: {e}")
embeddings = None
SUPABASE_URL = os.environ.get("SUPABASE_URL")
SUPABASE_SERVICE_KEY = os.environ.get("SUPABASE_SERVICE_KEY")
if SUPABASE_URL and SUPABASE_SERVICE_KEY and embeddings is not None:
try:
supabase: Client = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY)
vector_store = SupabaseVectorStore(
client=supabase,
embedding=embeddings,
table_name="documents",
query_name="match_documents_langchain",
)
except Exception as e:
print(f"⚠️ Could not initialize SupabaseVectorStore: {e}")
vector_store = None
else:
if not SUPABASE_URL or not SUPABASE_SERVICE_KEY:
print("⚠️ SUPABASE_URL or SUPABASE_SERVICE_KEY not set — skipping vector store initialization.")
elif embeddings is None:
print("⚠️ Embeddings not available — skipping vector store initialization.")
vector_store = None
# Create a retriever tool only if vector_store exists
if vector_store is not None:
try:
retriever_tool = create_retriever_tool(
retriever=vector_store.as_retriever(),
name="Question Search",
description="A tool to retrieve similar questions from a vector store.",
)
except Exception as e:
print(f"⚠️ Failed to create retriever tool from vector store: {e}")
retriever_tool = None
else:
retriever_tool = None
tools = [
multiply,
add,
subtract,
divide,
modulus,
wiki_search,
web_search,
arvix_search,
]
# Add retriever_tool to tools if available and matches the callable interface
if retriever_tool is not None:
try:
if hasattr(retriever_tool, "run"):
@tool
def retriever_wrapper(query: str) -> str:
return retriever_tool.run(query)
tools.append(retriever_wrapper)
else:
tools.append(retriever_tool)
except Exception as e:
print(f"⚠️ Could not append retriever tool to tools list: {e}")
# Build graph function
def build_graph(provider: str = "google"):
"""Build the graph"""
# Load environment variables from .env file
if provider == "google":
# Google Gemini
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0)
elif provider == "groq":
# Groq https://console.groq.com/docs/models
llm = ChatGroq(model="qwen-qwq-32b", temperature=0) # optional : qwen-qwq-32b gemma2-9b-it
elif provider == "huggingface":
# TODO: Add huggingface endpoint
llm = ChatHuggingFace(
llm=HuggingFaceEndpoint(
url="https://api-inference.huggingface.co/models/Meta-DeepLearning/llama-2-7b-chat-hf",
temperature=0,
),
)
else:
raise ValueError("Invalid provider. Choose 'google', 'groq' or 'huggingface'.")
# Bind tools to LLM
try:
llm_with_tools = llm.bind_tools(tools)
except Exception as e:
print(f"⚠️ Could not bind tools to LLM: {e}")
# fallback: keep LLM without tools
llm_with_tools = llm
# Node: assistant
def assistant(state: MessagesState):
"""Assistant node"""
try:
return {"messages": [llm_with_tools.invoke(state["messages"])]}
except Exception as e:
print(f"⚠️ assistant node failed: {e}")
# return empty message so graph can continue
return {"messages": [HumanMessage(content="")]}
from langchain_core.messages import AIMessage
def retriever(state: MessagesState):
query = state["messages"][-1].content
# If vector_store not available, return empty message so assistant proceeds normally
if vector_store is None:
return {"messages": [AIMessage(content="")]}
try:
similar_docs = vector_store.similarity_search(query, k=1)
if not similar_docs:
return {"messages": [AIMessage(content="")]}
similar_doc = similar_docs[0]
content = similar_doc.page_content
if "Final answer :" in content:
answer = content.split("Final answer :")[-1].strip()
else:
answer = content.strip()
return {"messages": [AIMessage(content=answer)]}
except Exception as e:
print(f"⚠️ retriever node failed: {e}")
return {"messages": [AIMessage(content="")]}
# Build the state graph: a simple retriever-only entry point (defensive)
builder = StateGraph(MessagesState)
builder.add_node("retriever", retriever)
# Retriever is both the entry and finish point in this design
builder.set_entry_point("retriever")
builder.set_finish_point("retriever")
# Compile graph
return builder.compile()