"""LangGraph Agent""" import os from dotenv import load_dotenv from langgraph.graph import START, StateGraph, MessagesState from langgraph.prebuilt import tools_condition from langgraph.prebuilt import ToolNode from langchain_google_genai import ChatGoogleGenerativeAI from langchain_groq import ChatGroq from langchain_huggingface import ChatHuggingFace, HuggingFaceEndpoint, HuggingFaceEmbeddings from langchain_community.tools.tavily_search import TavilySearchResults from langchain_community.document_loaders import WikipediaLoader from langchain_community.document_loaders import ArxivLoader from langchain_community.vectorstores import SupabaseVectorStore from langchain_core.messages import SystemMessage, HumanMessage from langchain_core.tools import tool from langchain.tools.retriever import create_retriever_tool from supabase.client import Client, create_client from langfuse.langchain import CallbackHandler # Initialize Langfuse CallbackHandler for LangGraph/Langchain (tracing) try: langfuse_handler = CallbackHandler() except Exception as e: print(f"Warning: Could not initialize Langfuse handler: {e}") langfuse_handler = None # Load environment variables - try multiple files load_dotenv() # Try .env first load_dotenv("env.local") # Try env.local as backup print(f"SUPABASE_URL loaded: {bool(os.environ.get('SUPABASE_URL'))}") print(f"GROQ_API_KEY loaded: {bool(os.environ.get('GROQ_API_KEY'))}") @tool def multiply(a: int, b: int) -> int: """Multiply two numbers. Args: a: first int b: second int """ return a * b @tool def add(a: int, b: int) -> int: """Add two numbers. Args: a: first int b: second int """ return a + b @tool def subtract(a: int, b: int) -> int: """Subtract two numbers. Args: a: first int b: second int """ return a - b @tool def divide(a: int, b: int) -> int: """Divide two numbers. Args: a: first int b: second int """ if b == 0: raise ValueError("Cannot divide by zero.") return a / b @tool def modulus(a: int, b: int) -> int: """Get the modulus of two numbers. Args: a: first int b: second int """ return a % b @tool def wiki_search(input: str) -> str: """Search Wikipedia for a query and return maximum 2 results. Args: input: The search query.""" try: search_docs = WikipediaLoader(query=input, load_max_docs=2).load() if not search_docs: return {"wiki_results": "No Wikipedia results found for the query."} formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content}\n' for doc in search_docs ]) return {"wiki_results": formatted_search_docs} except Exception as e: print(f"Error in wiki_search: {e}") return {"wiki_results": f"Error searching Wikipedia: {e}"} @tool def web_search(input: str) -> str: """Search Tavily for a query and return maximum 3 results. Args: input: The search query.""" try: search_docs = TavilySearchResults(max_results=3).invoke(query=input) if not search_docs: return {"web_results": "No web search results found for the query."} formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.get("content", "No content")}\n' for doc in search_docs ]) return {"web_results": formatted_search_docs} except Exception as e: print(f"Error in web_search: {e}") return {"web_results": f"Error searching web: {e}"} @tool def arvix_search(input: str) -> str: """Search Arxiv for a query and return maximum 3 result. Args: input: The search query.""" try: search_docs = ArxivLoader(query=input, load_max_docs=3).load() if not search_docs: return {"arvix_results": "No Arxiv results found for the query."} formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content[:1000]}\n' for doc in search_docs ]) return {"arvix_results": formatted_search_docs} except Exception as e: print(f"Error in arvix_search: {e}") return {"arvix_results": f"Error searching Arxiv: {e}"} # load the system prompt from the file with open("system_prompt.txt", "r", encoding="utf-8") as f: system_prompt = f.read() # System message sys_msg = SystemMessage(content=system_prompt) # build a retriever embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-mpnet-base-v2") # dim=768 # Try to create Supabase client with error handling try: supabase_url = os.environ.get("SUPABASE_URL") supabase_key = os.environ.get("SUPABASE_SERVICE_KEY") if not supabase_url or not supabase_key: print("Warning: Supabase credentials not found, vector store will be disabled") vector_store = None create_retriever_tool = None else: supabase: Client = create_client(supabase_url, supabase_key) vector_store = SupabaseVectorStore( client=supabase, embedding= embeddings, table_name="documents", query_name="match_documents_langchain", ) create_retriever_tool = create_retriever_tool( retriever=vector_store.as_retriever(), name="Question Search", description="A tool to retrieve similar questions from a vector store.", ) except Exception as e: print(f"Warning: Could not initialize Supabase vector store: {e}") vector_store = None create_retriever_tool = None tools = [ multiply, add, subtract, divide, modulus, wiki_search, web_search, arvix_search, ] if create_retriever_tool: tools.append(create_retriever_tool) # Build graph function def build_graph(provider: str = "groq"): """Build the graph""" # Load environment variables from .env file if provider == "google": # Google Gemini llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0) elif provider == "groq": # Groq https://console.groq.com/docs/models llm = ChatGroq(model="qwen-qwq-32b", temperature=0) # optional : qwen-qwq-32b gemma2-9b-it elif provider == "huggingface": # TODO: Add huggingface endpoint llm = ChatHuggingFace( llm=HuggingFaceEndpoint( url="https://api-inference.huggingface.co/models/Meta-DeepLearning/llama-2-7b-chat-hf", temperature=0, ), ) else: raise ValueError("Invalid provider. Choose 'google', 'groq' or 'huggingface'.") # Bind tools to LLM llm_with_tools = llm.bind_tools(tools) # Node def assistant(state: MessagesState): """Assistant node""" try: print(f"Assistant node: Processing {len(state['messages'])} messages") result = llm_with_tools.invoke(state["messages"]) print(f"Assistant node: LLM returned result type: {type(result)}") return {"messages": [result]} except Exception as e: print(f"Error in assistant node: {e}") from langchain_core.messages import AIMessage error_msg = AIMessage(content=f"I encountered an error: {e}") return {"messages": [error_msg]} def retriever(state: MessagesState): """Retriever node""" try: print(f"Retriever node: Processing {len(state['messages'])} messages") if not state["messages"]: print("Retriever node: No messages in state") return {"messages": [sys_msg]} if not vector_store: print("Retriever node: Vector store not available, skipping retrieval") return {"messages": [sys_msg] + state["messages"]} query_content = state["messages"][0].content print(f"Retriever node: Searching for similar questions with query: {query_content[:100]}...") similar_question = vector_store.similarity_search(query_content) print(f"Retriever node: Found {len(similar_question)} similar questions") if not similar_question: print("Retriever node: No similar questions found, proceeding without example") return {"messages": [sys_msg] + state["messages"]} example_msg = HumanMessage( content=f"Here I provide a similar question and answer for reference: \n\n{similar_question[0].page_content}", ) print(f"Retriever node: Added example message from similar question") return {"messages": [sys_msg] + state["messages"] + [example_msg]} except Exception as e: print(f"Error in retriever node: {e}") return {"messages": [sys_msg] + state["messages"]} builder = StateGraph(MessagesState) builder.add_node("retriever", retriever) builder.add_node("assistant", assistant) builder.add_node("tools", ToolNode(tools)) builder.add_edge(START, "retriever") builder.add_edge("retriever", "assistant") builder.add_conditional_edges( "assistant", tools_condition, ) builder.add_edge("tools", "assistant") # Compile graph return builder.compile() # test if __name__ == "__main__": question = "When was a picture of St. Thomas Aquinas first added to the Wikipedia page on the Principle of double effect?" # Build the graph graph = build_graph(provider="groq") # Run the graph messages = [HumanMessage(content=question)] messages = graph.invoke({"messages": messages}, config={"callbacks": [langfuse_handler]}) for m in messages["messages"]: m.pretty_print()