Spaces:
Sleeping
Sleeping
| """ | |
| Final Working Multi-LLM Agent System | |
| Robust fallback system that works even when Agno fails | |
| """ | |
| import os | |
| import time | |
| import random | |
| import operator | |
| from typing import List, Dict, Any, TypedDict, Annotated, Optional | |
| from dotenv import load_dotenv | |
| # Core LangChain imports | |
| from langchain_core.tools import tool | |
| from langchain_community.tools.tavily_search import TavilySearchResults | |
| from langchain_community.document_loaders import WikipediaLoader | |
| from langgraph.graph import StateGraph, END | |
| from langgraph.checkpoint.memory import MemorySaver | |
| from langchain_core.messages import SystemMessage, HumanMessage, AIMessage | |
| from langchain_groq import ChatGroq | |
| load_dotenv() | |
| # System prompt for proper question answering | |
| SYSTEM_PROMPT = """You are a helpful assistant tasked with answering questions using available tools. | |
| Guidelines: | |
| 1. Use available tools to gather information when needed | |
| 2. Provide precise, factual answers | |
| 3. For numbers: don't use commas or units unless specified | |
| 4. For strings: don't use articles or abbreviations, write digits in plain text | |
| 5. Always end with 'FINAL ANSWER: [YOUR ANSWER]' | |
| 6. Be concise but thorough | |
| 7. If you cannot find the answer, state that clearly""" | |
| # ---- Tool Definitions ---- | |
| def multiply(a: int, b: int) -> int: | |
| """Multiply two integers and return the product.""" | |
| return a * b | |
| def add(a: int, b: int) -> int: | |
| """Add two integers and return the sum.""" | |
| return a + b | |
| def subtract(a: int, b: int) -> int: | |
| """Subtract the second integer from the first and return the difference.""" | |
| return a - b | |
| def divide(a: int, b: int) -> float: | |
| """Divide the first integer by the second and return the quotient.""" | |
| if b == 0: | |
| raise ValueError("Cannot divide by zero.") | |
| return a / b | |
| def modulus(a: int, b: int) -> int: | |
| """Return the remainder when dividing the first integer by the second.""" | |
| return a % b | |
| def web_search(query: str) -> str: | |
| """Search the web for information.""" | |
| try: | |
| if os.getenv("TAVILY_API_KEY"): | |
| time.sleep(random.uniform(0.5, 1.0)) | |
| search_tool = TavilySearchResults(max_results=3) | |
| docs = search_tool.invoke({"query": query}) | |
| return "\n\n---\n\n".join( | |
| f"<Doc url='{d.get('url','')}'>{d.get('content','')[:600]}</Doc>" | |
| for d in docs | |
| ) | |
| else: | |
| return "Web search not available - no API key" | |
| except Exception as e: | |
| return f"Web search failed: {e}" | |
| def wiki_search(query: str) -> str: | |
| """Search Wikipedia for information.""" | |
| try: | |
| time.sleep(random.uniform(0.3, 0.8)) | |
| docs = WikipediaLoader(query=query, load_max_docs=2).load() | |
| return "\n\n---\n\n".join( | |
| f"<Doc src='Wikipedia'>{d.page_content[:800]}</Doc>" | |
| for d in docs | |
| ) | |
| except Exception as e: | |
| return f"Wikipedia search failed: {e}" | |
| # ---- Enhanced Agent State ---- | |
| class EnhancedAgentState(TypedDict): | |
| messages: Annotated[List[HumanMessage | AIMessage], operator.add] | |
| query: str | |
| agent_type: str | |
| final_answer: str | |
| perf: Dict[str, Any] | |
| tools_used: List[str] | |
| # ---- Working Multi-LLM System ---- | |
| class WorkingMultiLLMSystem: | |
| """Reliable multi-LLM system that actually works""" | |
| def __init__(self): | |
| self.tools = [multiply, add, subtract, divide, modulus, web_search, wiki_search] | |
| self.graph = self._build_graph() | |
| print("✅ Working Multi-LLM System initialized") | |
| def _get_llm(self, model_name: str = "llama3-70b-8192"): | |
| """Get Groq LLM instance""" | |
| return ChatGroq( | |
| model=model_name, | |
| temperature=0, | |
| api_key=os.getenv("GROQ_API_KEY") | |
| ) | |
| def _build_graph(self) -> StateGraph: | |
| """Build the working LangGraph system""" | |
| def router(st: EnhancedAgentState) -> EnhancedAgentState: | |
| """Route queries to appropriate processing""" | |
| q = st["query"].lower() | |
| if any(keyword in q for keyword in ["calculate", "multiply", "add", "subtract", "divide", "math"]): | |
| agent_type = "math" | |
| elif any(keyword in q for keyword in ["search", "find", "information", "about"]): | |
| agent_type = "search" | |
| elif any(keyword in q for keyword in ["wikipedia", "wiki"]): | |
| agent_type = "wiki" | |
| else: | |
| agent_type = "general" | |
| return {**st, "agent_type": agent_type, "tools_used": []} | |
| def math_node(st: EnhancedAgentState) -> EnhancedAgentState: | |
| """Handle mathematical queries""" | |
| t0 = time.time() | |
| try: | |
| llm = self._get_llm("llama3-70b-8192") | |
| enhanced_query = f""" | |
| Question: {st["query"]} | |
| This is a mathematical question. Please solve it step by step and provide the exact numerical answer. | |
| """ | |
| sys_msg = SystemMessage(content=SYSTEM_PROMPT) | |
| response = llm.invoke([sys_msg, HumanMessage(content=enhanced_query)]) | |
| answer = response.content.strip() | |
| if "FINAL ANSWER:" in answer: | |
| answer = answer.split("FINAL ANSWER:")[-1].strip() | |
| return {**st, | |
| "final_answer": answer, | |
| "perf": {"time": time.time() - t0, "provider": "Groq-Math"}} | |
| except Exception as e: | |
| return {**st, "final_answer": f"Error: {e}", "perf": {"error": str(e)}} | |
| def search_node(st: EnhancedAgentState) -> EnhancedAgentState: | |
| """Handle search queries""" | |
| t0 = time.time() | |
| try: | |
| # Perform web search | |
| search_results = web_search.invoke({"query": st["query"]}) | |
| llm = self._get_llm("llama3-70b-8192") | |
| enhanced_query = f""" | |
| Question: {st["query"]} | |
| Search Results: | |
| {search_results} | |
| Based on the search results above, provide a direct answer to the question. | |
| """ | |
| sys_msg = SystemMessage(content=SYSTEM_PROMPT) | |
| response = llm.invoke([sys_msg, HumanMessage(content=enhanced_query)]) | |
| answer = response.content.strip() | |
| if "FINAL ANSWER:" in answer: | |
| answer = answer.split("FINAL ANSWER:")[-1].strip() | |
| return {**st, | |
| "final_answer": answer, | |
| "tools_used": ["web_search"], | |
| "perf": {"time": time.time() - t0, "provider": "Groq-Search"}} | |
| except Exception as e: | |
| return {**st, "final_answer": f"Error: {e}", "perf": {"error": str(e)}} | |
| def wiki_node(st: EnhancedAgentState) -> EnhancedAgentState: | |
| """Handle Wikipedia queries""" | |
| t0 = time.time() | |
| try: | |
| # Perform Wikipedia search | |
| wiki_results = wiki_search.invoke({"query": st["query"]}) | |
| llm = self._get_llm("llama3-70b-8192") | |
| enhanced_query = f""" | |
| Question: {st["query"]} | |
| Wikipedia Results: | |
| {wiki_results} | |
| Based on the Wikipedia information above, provide a direct answer to the question. | |
| """ | |
| sys_msg = SystemMessage(content=SYSTEM_PROMPT) | |
| response = llm.invoke([sys_msg, HumanMessage(content=enhanced_query)]) | |
| answer = response.content.strip() | |
| if "FINAL ANSWER:" in answer: | |
| answer = answer.split("FINAL ANSWER:")[-1].strip() | |
| return {**st, | |
| "final_answer": answer, | |
| "tools_used": ["wiki_search"], | |
| "perf": {"time": time.time() - t0, "provider": "Groq-Wiki"}} | |
| except Exception as e: | |
| return {**st, "final_answer": f"Error: {e}", "perf": {"error": str(e)}} | |
| def general_node(st: EnhancedAgentState) -> EnhancedAgentState: | |
| """Handle general queries""" | |
| t0 = time.time() | |
| try: | |
| llm = self._get_llm("llama3-70b-8192") | |
| enhanced_query = f""" | |
| Question: {st["query"]} | |
| Please provide a direct, accurate answer to this question. | |
| """ | |
| sys_msg = SystemMessage(content=SYSTEM_PROMPT) | |
| response = llm.invoke([sys_msg, HumanMessage(content=enhanced_query)]) | |
| answer = response.content.strip() | |
| if "FINAL ANSWER:" in answer: | |
| answer = answer.split("FINAL ANSWER:")[-1].strip() | |
| return {**st, | |
| "final_answer": answer, | |
| "perf": {"time": time.time() - t0, "provider": "Groq-General"}} | |
| except Exception as e: | |
| return {**st, "final_answer": f"Error: {e}", "perf": {"error": str(e)}} | |
| # Build graph | |
| g = StateGraph(EnhancedAgentState) | |
| g.add_node("router", router) | |
| g.add_node("math", math_node) | |
| g.add_node("search", search_node) | |
| g.add_node("wiki", wiki_node) | |
| g.add_node("general", general_node) | |
| g.set_entry_point("router") | |
| g.add_conditional_edges("router", lambda s: s["agent_type"], { | |
| "math": "math", | |
| "search": "search", | |
| "wiki": "wiki", | |
| "general": "general" | |
| }) | |
| for node in ["math", "search", "wiki", "general"]: | |
| g.add_edge(node, END) | |
| return g.compile(checkpointer=MemorySaver()) | |
| def process_query(self, query: str) -> str: | |
| """Process a query through the working system""" | |
| state = { | |
| "messages": [HumanMessage(content=query)], | |
| "query": query, | |
| "agent_type": "", | |
| "final_answer": "", | |
| "perf": {}, | |
| "tools_used": [] | |
| } | |
| config = {"configurable": {"thread_id": f"working_{hash(query)}"}} | |
| try: | |
| result = self.graph.invoke(state, config) | |
| answer = result.get("final_answer", "").strip() | |
| # Validation | |
| if not answer or answer == query or len(answer.strip()) == 0: | |
| return "Information not available" | |
| return answer | |
| except Exception as e: | |
| return f"Error processing query: {e}" | |
| # ---- Compatibility Classes ---- | |
| class UnifiedAgnoEnhancedSystem: | |
| """Compatibility wrapper for the working system""" | |
| def __init__(self): | |
| print("Initializing working system...") | |
| self.agno_system = None # Not using Agno | |
| self.working_system = WorkingMultiLLMSystem() | |
| self.graph = self.working_system.graph | |
| def process_query(self, query: str) -> str: | |
| return self.working_system.process_query(query) | |
| def get_system_info(self) -> Dict[str, Any]: | |
| return { | |
| "system": "working_multi_llm", | |
| "agno_available": False, | |
| "total_models": 1, | |
| "active_agents": ["math", "search", "wiki", "general"] | |
| } | |
| # For backward compatibility | |
| AgnoEnhancedAgentSystem = WorkingMultiLLMSystem | |
| AgnoEnhancedModelManager = WorkingMultiLLMSystem | |
| def build_graph(provider: str = "working"): | |
| """Build working graph""" | |
| system = WorkingMultiLLMSystem() | |
| return system.graph | |
| if __name__ == "__main__": | |
| # Test the working system | |
| system = WorkingMultiLLMSystem() | |
| test_questions = [ | |
| "How many studio albums were published by Mercedes Sosa between 2000 and 2009?", | |
| "What is 25 multiplied by 17?", | |
| "Who nominated the only Featured Article on English Wikipedia about a dinosaur?" | |
| ] | |
| print("Testing Working Multi-LLM System:") | |
| for i, question in enumerate(test_questions, 1): | |
| print(f"\nQuestion {i}: {question}") | |
| answer = system.process_query(question) | |
| print(f"Answer: {answer}") | |