Spaces:
Sleeping
Sleeping
| import os | |
| import pandas as pd | |
| import numpy as np | |
| from dotenv import load_dotenv | |
| from langgraph.graph import START, StateGraph, MessagesState | |
| from langgraph.prebuilt import tools_condition | |
| from langgraph.prebuilt import ToolNode | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from langchain_groq import ChatGroq | |
| from langchain_huggingface import ( | |
| ChatHuggingFace, | |
| HuggingFaceEndpoint, | |
| HuggingFaceEmbeddings, | |
| ) | |
| from langchain_community.tools.tavily_search import TavilySearchResults | |
| from langchain_community.document_loaders import WikipediaLoader | |
| from langchain_community.document_loaders import ArxivLoader | |
| from langchain_core.messages import SystemMessage, HumanMessage, AIMessage | |
| from langchain_core.tools import tool | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import ast | |
| load_dotenv() | |
| def multiply(a: int, b: int) -> int: | |
| """Multiply two numbers. | |
| Args: | |
| a: first int | |
| b: second int | |
| """ | |
| return a * b | |
| def add(a: int, b: int) -> int: | |
| """Add two numbers. | |
| Args: | |
| a: first int | |
| b: second int | |
| """ | |
| return a + b | |
| def subtract(a: int, b: int) -> int: | |
| """Subtract two numbers. | |
| Args: | |
| a: first int | |
| b: second int | |
| """ | |
| return a - b | |
| def divide(a: int, b: int) -> int: | |
| """Divide two numbers. | |
| Args: | |
| a: first int | |
| b: second int | |
| """ | |
| if b == 0: | |
| raise ValueError("Cannot divide by zero.") | |
| return a / b | |
| def modulus(a: int, b: int) -> int: | |
| """Get the modulus of two numbers. | |
| Args: | |
| a: first int | |
| b: second int | |
| """ | |
| return a % b | |
| def wiki_search(query: str) -> str: | |
| """Search Wikipedia for a query and return maximum 2 results. | |
| Args: | |
| query: The search query.""" | |
| search_docs = WikipediaLoader(query=query, load_max_docs=2).load() | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
| for doc in search_docs | |
| ] | |
| ) | |
| return {"wiki_results": formatted_search_docs} | |
| def web_search(query: str) -> str: | |
| """Search Tavily for a query and return maximum 3 results. | |
| Args: | |
| query: The search query.""" | |
| search_docs = TavilySearchResults(max_results=3).invoke(query=query) | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
| for doc in search_docs | |
| ] | |
| ) | |
| return {"web_results": formatted_search_docs} | |
| def arvix_search(query: str) -> str: | |
| """Search Arxiv for a query and return maximum 3 result. | |
| Args: | |
| query: The search query.""" | |
| search_docs = ArxivLoader(query=query, load_max_docs=3).load() | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>' | |
| for doc in search_docs | |
| ] | |
| ) | |
| return {"arvix_results": formatted_search_docs} | |
| # Load CSV data and embeddings | |
| class LocalCSVRetriever: | |
| def __init__(self, csv_file_path="supabase_docs.csv"): | |
| self.csv_file_path = csv_file_path | |
| self.df = None | |
| self.embeddings_model = HuggingFaceEmbeddings( | |
| model_name="sentence-transformers/all-mpnet-base-v2" | |
| ) | |
| self.load_data() | |
| def load_data(self): | |
| """Load data from CSV file""" | |
| try: | |
| self.df = pd.read_csv(self.csv_file_path) | |
| print(f"Loaded {len(self.df)} documents from {self.csv_file_path}") | |
| # Convert string representation of embeddings back to numpy arrays | |
| if 'embedding' in self.df.columns: | |
| self.df['embedding_array'] = self.df['embedding'].apply( | |
| lambda x: np.array(ast.literal_eval(x)) if isinstance(x, str) else np.array(x) | |
| ) | |
| except FileNotFoundError: | |
| print(f"CSV file {self.csv_file_path} not found!") | |
| self.df = pd.DataFrame() | |
| except Exception as e: | |
| print(f"Error loading CSV: {e}") | |
| self.df = pd.DataFrame() | |
| def similarity_search(self, query: str, k: int = 1): | |
| """Perform similarity search on local data""" | |
| if self.df.empty: | |
| return [] | |
| # Get query embedding | |
| query_embedding = self.embeddings_model.embed_query(query) | |
| query_embedding = np.array(query_embedding).reshape(1, -1) | |
| # Calculate similarities | |
| similarities = [] | |
| for idx, row in self.df.iterrows(): | |
| doc_embedding = row['embedding_array'].reshape(1, -1) | |
| similarity = cosine_similarity(query_embedding, doc_embedding)[0][0] | |
| similarities.append((idx, similarity, row['content'])) | |
| # Sort by similarity and return top k | |
| similarities.sort(key=lambda x: x[1], reverse=True) | |
| # Create simple document-like objects | |
| results = [] | |
| for i in range(min(k, len(similarities))): | |
| idx, sim_score, content = similarities[i] | |
| # Create a simple object with page_content attribute | |
| doc = type('Document', (), { | |
| 'page_content': content, | |
| 'metadata': ast.literal_eval(self.df.iloc[idx]['metadata']) if isinstance(self.df.iloc[idx]['metadata'], str) else self.df.iloc[idx]['metadata'] | |
| })() | |
| results.append(doc) | |
| return results | |
| # Initialize the local retriever | |
| local_retriever = LocalCSVRetriever() | |
| # load the system prompt from the file | |
| with open("system_prompt.txt", "r", encoding="utf-8") as f: | |
| system_prompt = f.read() | |
| # System message | |
| sys_msg = SystemMessage(content=system_prompt) | |
| tools = [ | |
| multiply, | |
| add, | |
| subtract, | |
| divide, | |
| modulus, | |
| wiki_search, | |
| web_search, | |
| arvix_search, | |
| ] | |
| # Build graph function | |
| def build_graph(provider: str = "groq"): | |
| """Build the graph""" | |
| # Load environment variables from .env file | |
| if provider == "google": | |
| # Google Gemini | |
| llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash", temperature=0) | |
| elif provider == "groq": | |
| # Groq https://console.groq.com/docs/models | |
| llm = ChatGroq( | |
| model="qwen-qwq-32b", temperature=0 | |
| ) # optional : qwen-qwq-32b gemma2-9b-it | |
| elif provider == "huggingface": | |
| # TODO: Add huggingface endpoint | |
| llm = ChatHuggingFace( | |
| llm=HuggingFaceEndpoint( | |
| url="https://api-inference.huggingface.co/models/Meta-DeepLearning/llama-2-7b-chat-hf", | |
| temperature=0, | |
| ), | |
| ) | |
| else: | |
| raise ValueError("Invalid provider. Choose 'google', 'groq' or 'huggingface'.") | |
| # Bind tools to LLM | |
| llm_with_tools = llm.bind_tools(tools) | |
| # Node | |
| def assistant(state: MessagesState): | |
| """Assistant node""" | |
| return {"messages": [llm_with_tools.invoke(state["messages"])]} | |
| def retriever(state: MessagesState): | |
| """Modified retriever to use local CSV data""" | |
| query = state["messages"][-1].content | |
| similar_docs = local_retriever.similarity_search(query, k=1) | |
| # Handle empty results | |
| if not similar_docs: | |
| return { | |
| "messages": [ | |
| AIMessage( | |
| content="I don't have information about this topic in my knowledge base. Please try a different question." | |
| ) | |
| ] | |
| } | |
| similar_doc = similar_docs[0] | |
| content = similar_doc.page_content | |
| if "Final answer :" in content: | |
| answer = content.split("Final answer :")[-1].strip() | |
| else: | |
| answer = content.strip() | |
| # Ensure answer is not empty | |
| if not answer: | |
| answer = "I found related information but couldn't extract a clear answer. Please rephrase your question." | |
| return {"messages": [AIMessage(content=answer)]} | |
| builder = StateGraph(MessagesState) | |
| builder.add_node("retriever", retriever) | |
| # Retriever ist Start und Endpunkt | |
| builder.set_entry_point("retriever") | |
| builder.set_finish_point("retriever") | |
| # Compile graph | |
| return builder.compile() |