Spaces:
Sleeping
Sleeping
| """LangGraph Agent with Best Free Models and Minimal Rate Limits""" | |
| import os, time, random | |
| from dotenv import load_dotenv | |
| from typing import List, Dict, Any, TypedDict, Annotated | |
| import operator | |
| # LangGraph imports | |
| from langgraph.graph import START, StateGraph, MessagesState | |
| from langgraph.prebuilt import tools_condition | |
| from langgraph.prebuilt import ToolNode | |
| from langgraph.checkpoint.memory import MemorySaver | |
| # LangChain imports | |
| from langchain_core.messages import SystemMessage, HumanMessage | |
| from langchain_core.tools import tool | |
| from langchain_groq import ChatGroq | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from langchain_nvidia_ai_endpoints import ChatNVIDIA | |
| from langchain_community.tools.tavily_search import TavilySearchResults | |
| from langchain_community.document_loaders import WikipediaLoader, ArxivLoader | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_nvidia_ai_endpoints import NVIDIAEmbeddings | |
| from langchain.tools.retriever import create_retriever_tool | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_community.document_loaders import JSONLoader | |
| load_dotenv() | |
| # Advanced Rate Limiter with Exponential Backoff | |
| class AdvancedRateLimiter: | |
| def __init__(self, requests_per_minute: int, provider_name: str): | |
| self.requests_per_minute = requests_per_minute | |
| self.provider_name = provider_name | |
| self.request_times = [] | |
| self.consecutive_failures = 0 | |
| def wait_if_needed(self): | |
| current_time = time.time() | |
| # Clean old requests (older than 1 minute) | |
| self.request_times = [t for t in self.request_times if current_time - t < 60] | |
| # Check if we need to wait | |
| if len(self.request_times) >= self.requests_per_minute: | |
| wait_time = 60 - (current_time - self.request_times[0]) + random.uniform(2, 8) | |
| time.sleep(wait_time) | |
| # Add exponential backoff for consecutive failures | |
| if self.consecutive_failures > 0: | |
| backoff_time = min(2 ** self.consecutive_failures, 60) + random.uniform(1, 3) | |
| time.sleep(backoff_time) | |
| # Record this request | |
| self.request_times.append(current_time) | |
| def record_success(self): | |
| self.consecutive_failures = 0 | |
| def record_failure(self): | |
| self.consecutive_failures += 1 | |
| # Initialize rate limiters based on search results | |
| # Gemini 2.0 Flash-Lite: 30 RPM (highest free tier) | |
| gemini_limiter = AdvancedRateLimiter(requests_per_minute=25, provider_name="Gemini") # Conservative | |
| # Groq: Typically 30 RPM for free tier | |
| groq_limiter = AdvancedRateLimiter(requests_per_minute=25, provider_name="Groq") # Conservative | |
| # NVIDIA: Typically 5 RPM for free tier | |
| nvidia_limiter = AdvancedRateLimiter(requests_per_minute=4, provider_name="NVIDIA") # Very conservative | |
| # Initialize LLMs with best models and minimal rate limits | |
| def get_best_models(): | |
| """Get the best models with lowest rate limits""" | |
| # Gemini 2.0 Flash-Lite - Best rate limit (30 RPM) with good performance | |
| gemini_llm = ChatGoogleGenerativeAI( | |
| model="gemini-2.0-flash-lite", # Best rate limit from search results | |
| api_key=os.getenv("GOOGLE_API_KEY"), | |
| temperature=0, | |
| max_output_tokens=4000 | |
| ) | |
| # Groq Llama 3.3 70B - Fast and capable | |
| groq_llm = ChatGroq( | |
| model="llama-3.3-70b-versatile", | |
| api_key=os.getenv("GROQ_API_KEY"), | |
| temperature=0, | |
| max_tokens=4000 | |
| ) | |
| # NVIDIA Llama 3.1 70B - Good for specialized tasks | |
| nvidia_llm = ChatNVIDIA( | |
| model="meta/llama-3.1-70b-instruct", | |
| api_key=os.getenv("NVIDIA_API_KEY"), | |
| temperature=0, | |
| max_tokens=4000 | |
| ) | |
| return { | |
| "gemini": gemini_llm, | |
| "groq": groq_llm, | |
| "nvidia": nvidia_llm | |
| } | |
| # Fallback strategy with rate limit handling | |
| class ModelFallbackManager: | |
| def __init__(self): | |
| self.models = get_best_models() | |
| self.limiters = { | |
| "gemini": gemini_limiter, | |
| "groq": groq_limiter, | |
| "nvidia": nvidia_limiter | |
| } | |
| self.fallback_order = ["gemini", "groq", "nvidia"] # Order by rate limit capacity | |
| def invoke_with_fallback(self, messages, max_retries=3): | |
| """Try models in order with rate limiting and fallbacks""" | |
| for provider in self.fallback_order: | |
| limiter = self.limiters[provider] | |
| model = self.models[provider] | |
| for attempt in range(max_retries): | |
| try: | |
| # Apply rate limiting | |
| limiter.wait_if_needed() | |
| # Try to invoke the model | |
| response = model.invoke(messages) | |
| limiter.record_success() | |
| return response | |
| except Exception as e: | |
| error_msg = str(e).lower() | |
| # Check if it's a rate limit error | |
| if any(keyword in error_msg for keyword in ['rate limit', '429', 'quota', 'too many requests']): | |
| limiter.record_failure() | |
| wait_time = (2 ** attempt) + random.uniform(10, 30) | |
| time.sleep(wait_time) | |
| continue | |
| else: | |
| # Non-rate limit error, try next provider | |
| break | |
| # If all providers fail | |
| raise Exception("All model providers failed or hit rate limits") | |
| # Custom Tools | |
| def multiply(a: int, b: int) -> int: | |
| """Multiply two numbers.""" | |
| return a * b | |
| def add(a: int, b: int) -> int: | |
| """Add two numbers.""" | |
| return a + b | |
| def subtract(a: int, b: int) -> int: | |
| """Subtract two numbers.""" | |
| return a - b | |
| def divide(a: int, b: int) -> float: | |
| """Divide two numbers.""" | |
| if b == 0: | |
| raise ValueError("Cannot divide by zero.") | |
| return a / b | |
| def modulus(a: int, b: int) -> int: | |
| """Get the modulus of two numbers.""" | |
| return a % b | |
| def wiki_search(query: str) -> str: | |
| """Search Wikipedia for a query and return maximum 2 results.""" | |
| try: | |
| time.sleep(random.uniform(1, 3)) | |
| search_docs = WikipediaLoader(query=query, load_max_docs=2).load() | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>' | |
| for doc in search_docs | |
| ]) | |
| return formatted_search_docs | |
| except Exception as e: | |
| return f"Wikipedia search failed: {str(e)}" | |
| def web_search(query: str) -> str: | |
| """Search Tavily for a query and return maximum 3 results.""" | |
| try: | |
| time.sleep(random.uniform(2, 5)) | |
| search_docs = TavilySearchResults(max_results=3).invoke(query=query) | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.get("url", "")}" />\n{doc.get("content", "")}\n</Document>' | |
| for doc in search_docs | |
| ]) | |
| return formatted_search_docs | |
| except Exception as e: | |
| return f"Web search failed: {str(e)}" | |
| def arvix_search(query: str) -> str: | |
| """Search Arxiv for a query and return maximum 3 result.""" | |
| try: | |
| time.sleep(random.uniform(1, 4)) | |
| search_docs = ArxivLoader(query=query, load_max_docs=3).load() | |
| formatted_search_docs = "\n\n---\n\n".join( | |
| [ | |
| f'<Document source="{doc.metadata["source"]}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content[:1000]}\n</Document>' | |
| for doc in search_docs | |
| ]) | |
| return formatted_search_docs | |
| except Exception as e: | |
| return f"ArXiv search failed: {str(e)}" | |
| # Setup FAISS vector store | |
| def setup_faiss_vector_store(): | |
| """Setup FAISS vector database from JSONL metadata""" | |
| try: | |
| jq_schema = """ | |
| { | |
| page_content: .Question, | |
| metadata: { | |
| task_id: .task_id, | |
| Level: .Level, | |
| Final_answer: ."Final answer", | |
| file_name: .file_name, | |
| Steps: .["Annotator Metadata"].Steps, | |
| Number_of_steps: .["Annotator Metadata"]["Number of steps"], | |
| How_long: .["Annotator Metadata"]["How long did this take?"], | |
| Tools: .["Annotator Metadata"].Tools, | |
| Number_of_tools: .["Annotator Metadata"]["Number of tools"] | |
| } | |
| } | |
| """ | |
| json_loader = JSONLoader(file_path="metadata.jsonl", jq_schema=jq_schema, json_lines=True, text_content=False) | |
| json_docs = json_loader.load() | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=200) | |
| json_chunks = text_splitter.split_documents(json_docs) | |
| embeddings = NVIDIAEmbeddings( | |
| model="nvidia/nv-embedqa-e5-v5", | |
| api_key=os.getenv("NVIDIA_API_KEY") | |
| ) | |
| vector_store = FAISS.from_documents(json_chunks, embeddings) | |
| return vector_store | |
| except Exception as e: | |
| print(f"FAISS vector store setup failed: {e}") | |
| return None | |
| # Load system prompt | |
| try: | |
| with open("system_prompt.txt", "r", encoding="utf-8") as f: | |
| system_prompt = f.read() | |
| except FileNotFoundError: | |
| system_prompt = """You are a helpful assistant tasked with answering questions using a set of tools. | |
| Now, I will ask you a question. Report your thoughts, and finish your answer with the following template: | |
| FINAL ANSWER: [YOUR FINAL ANSWER]. | |
| YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma separated list of numbers and/or strings.""" | |
| sys_msg = SystemMessage(content=system_prompt) | |
| # Setup vector store and retriever | |
| vector_store = setup_faiss_vector_store() | |
| if vector_store: | |
| retriever = vector_store.as_retriever(search_type="similarity", search_kwargs={"k": 3}) | |
| retriever_tool = create_retriever_tool( | |
| retriever=retriever, | |
| name="Question_Search", | |
| description="A tool to retrieve similar questions from a vector store.", | |
| ) | |
| else: | |
| retriever_tool = None | |
| # All tools | |
| all_tools = [multiply, add, subtract, divide, modulus, wiki_search, web_search, arvix_search] | |
| if retriever_tool: | |
| all_tools.append(retriever_tool) | |
| # Build graph function with fallback manager | |
| def build_graph(provider="groq"): | |
| """Build the LangGraph with rate limiting and fallbacks""" | |
| fallback_manager = ModelFallbackManager() | |
| # Create a wrapper LLM that uses fallback manager | |
| class FallbackLLM: | |
| def bind_tools(self, tools): | |
| self.tools = tools | |
| return self | |
| def invoke(self, messages): | |
| return fallback_manager.invoke_with_fallback(messages) | |
| llm_with_tools = FallbackLLM().bind_tools(all_tools) | |
| # Node functions | |
| def assistant(state: MessagesState): | |
| """Assistant node with fallback handling""" | |
| return {"messages": [llm_with_tools.invoke(state["messages"])]} | |
| def retriever_node(state: MessagesState): | |
| """Retriever node""" | |
| if vector_store and len(state["messages"]) > 0: | |
| try: | |
| similar_questions = vector_store.similarity_search(state["messages"][-1].content, k=1) | |
| if similar_questions: | |
| example_msg = HumanMessage( | |
| content=f"Here I provide a similar question and answer for reference: \n\n{similar_questions[0].page_content}", | |
| ) | |
| return {"messages": [sys_msg] + state["messages"] + [example_msg]} | |
| except Exception as e: | |
| print(f"Retriever error: {e}") | |
| return {"messages": [sys_msg] + state["messages"]} | |
| # Build graph | |
| builder = StateGraph(MessagesState) | |
| builder.add_node("retriever", retriever_node) | |
| builder.add_node("assistant", assistant) | |
| builder.add_node("tools", ToolNode(all_tools)) | |
| builder.add_edge(START, "retriever") | |
| builder.add_edge("retriever", "assistant") | |
| builder.add_conditional_edges("assistant", tools_condition) | |
| builder.add_edge("tools", "assistant") | |
| # Compile graph with memory | |
| memory = MemorySaver() | |
| return builder.compile(checkpointer=memory) | |
| # Test | |
| if __name__ == "__main__": | |
| question = "What are the names of the US presidents who were assassinated?" | |
| graph = build_graph() | |
| messages = [HumanMessage(content=question)] | |
| config = {"configurable": {"thread_id": "test_thread"}} | |
| result = graph.invoke({"messages": messages}, config) | |
| for m in result["messages"]: | |
| m.pretty_print() | |