import os import pandas as pd import numpy as np from dotenv import load_dotenv from langgraph.graph import START, StateGraph, MessagesState from langgraph.prebuilt import tools_condition from langgraph.prebuilt import ToolNode from langchain_google_genai import ChatGoogleGenerativeAI from langchain_groq import ChatGroq from langchain_huggingface import ( ChatHuggingFace, HuggingFaceEndpoint, HuggingFaceEmbeddings, ) from langchain_community.tools.tavily_search import TavilySearchResults from langchain_community.document_loaders import WikipediaLoader from langchain_community.document_loaders import ArxivLoader from langchain_core.messages import SystemMessage, HumanMessage, AIMessage from langchain_core.tools import tool from sklearn.metrics.pairwise import cosine_similarity import ast load_dotenv() @tool def multiply(a: int, b: int) -> int: """Multiply two numbers. Args: a: first int b: second int """ return a * b @tool def add(a: int, b: int) -> int: """Add two numbers. Args: a: first int b: second int """ return a + b @tool def subtract(a: int, b: int) -> int: """Subtract two numbers. Args: a: first int b: second int """ return a - b @tool def divide(a: int, b: int) -> int: """Divide two numbers. Args: a: first int b: second int """ if b == 0: raise ValueError("Cannot divide by zero.") return a / b @tool def modulus(a: int, b: int) -> int: """Get the modulus of two numbers. Args: a: first int b: second int """ return a % b @tool def wiki_search(query: str) -> str: """Search Wikipedia for a query and return maximum 2 results. Args: query: The search query.""" search_docs = WikipediaLoader(query=query, load_max_docs=2).load() formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content}\n' for doc in search_docs ] ) return {"wiki_results": formatted_search_docs} @tool def web_search(query: str) -> str: """Search Tavily for a query and return maximum 3 results. Args: query: The search query.""" search_docs = TavilySearchResults(max_results=3).invoke(query=query) formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content}\n' for doc in search_docs ] ) return {"web_results": formatted_search_docs} @tool def arvix_search(query: str) -> str: """Search Arxiv for a query and return maximum 3 result. Args: query: The search query.""" search_docs = ArxivLoader(query=query, load_max_docs=3).load() formatted_search_docs = "\n\n---\n\n".join( [ f'\n{doc.page_content[:1000]}\n' for doc in search_docs ] ) return {"arvix_results": formatted_search_docs} # Load CSV data and embeddings class LocalCSVRetriever: def __init__(self, csv_file_path="supabase_docs.csv"): self.csv_file_path = csv_file_path self.df = None self.embeddings_model = HuggingFaceEmbeddings( model_name="sentence-transformers/all-mpnet-base-v2" ) self.load_data() def load_data(self): """Load data from CSV file""" try: self.df = pd.read_csv(self.csv_file_path) print(f"Loaded {len(self.df)} documents from {self.csv_file_path}") # Convert string representation of embeddings back to numpy arrays if 'embedding' in self.df.columns: self.df['embedding_array'] = self.df['embedding'].apply( lambda x: np.array(ast.literal_eval(x)) if isinstance(x, str) else np.array(x) ) except FileNotFoundError: print(f"CSV file {self.csv_file_path} not found!") self.df = pd.DataFrame() except Exception as e: print(f"Error loading CSV: {e}") self.df = pd.DataFrame() def similarity_search(self, query: str, k: int = 1): """Perform similarity search on local data""" if self.df.empty: return [] # Get query embedding query_embedding = self.embeddings_model.embed_query(query) query_embedding = np.array(query_embedding).reshape(1, -1) # Calculate similarities similarities = [] for idx, row in self.df.iterrows(): doc_embedding = row['embedding_array'].reshape(1, -1) similarity = cosine_similarity(query_embedding, doc_embedding)[0][0] similarities.append((idx, similarity, row['content'])) # Sort by similarity and return top k similarities.sort(key=lambda x: x[1], reverse=True) # Create simple document-like objects results = [] for i in range(min(k, len(similarities))): idx, sim_score, content = similarities[i] # Create a simple object with page_content attribute doc = type('Document', (), { 'page_content': content, 'metadata': ast.literal_eval(self.df.iloc[idx]['metadata']) if isinstance(self.df.iloc[idx]['metadata'], str) else self.df.iloc[idx]['metadata'] })() results.append(doc) return results # Initialize the local retriever local_retriever = LocalCSVRetriever() # load the system prompt from the file with open("system_prompt.txt", "r", encoding="utf-8") as f: system_prompt = f.read() # System message sys_msg = SystemMessage(content=system_prompt) tools = [ multiply, add, subtract, divide, modulus, wiki_search, web_search, arvix_search, ] # Build graph function def build_graph(provider: str = "groq"): """Build the graph""" # Load environment variables from .env file if provider == "google": # Google Gemini llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0) elif provider == "groq": # Groq https://console.groq.com/docs/models llm = ChatGroq( model="qwen-qwq-32b", temperature=0 ) # optional : qwen-qwq-32b gemma2-9b-it elif provider == "huggingface": # TODO: Add huggingface endpoint llm = ChatHuggingFace( llm=HuggingFaceEndpoint( url="https://api-inference.huggingface.co/models/Meta-DeepLearning/llama-2-7b-chat-hf", temperature=0, ), ) else: raise ValueError("Invalid provider. Choose 'google', 'groq' or 'huggingface'.") # Bind tools to LLM llm_with_tools = llm.bind_tools(tools) # Node def assistant(state: MessagesState): """Assistant node""" return {"messages": [llm_with_tools.invoke(state["messages"])]} def retriever(state: MessagesState): """Modified retriever to use local CSV data""" query = state["messages"][-1].content similar_docs = local_retriever.similarity_search(query, k=1) # Handle empty results if not similar_docs: return { "messages": [ AIMessage( content="I don't have information about this topic in my knowledge base. Please try a different question." ) ] } similar_doc = similar_docs[0] content = similar_doc.page_content if "Final answer :" in content: answer = content.split("Final answer :")[-1].strip() else: answer = content.strip() # Ensure answer is not empty if not answer: answer = "I found related information but couldn't extract a clear answer. Please rephrase your question." return {"messages": [AIMessage(content=answer)]} builder = StateGraph(MessagesState) builder.add_node("retriever", retriever) # Retriever ist Start und Endpunkt builder.set_entry_point("retriever") builder.set_finish_point("retriever") # Compile graph return builder.compile()