Spaces:
Sleeping
Sleeping
| """ | |
| Enhanced Agno Multi-LLM Agent System with NVIDIA Integration | |
| Uses open-source models + NVIDIA NIM models available through Agno framework | |
| """ | |
| import os | |
| import time | |
| import random | |
| import operator | |
| from typing import List, Dict, Any, TypedDict, Annotated, Optional | |
| from dotenv import load_dotenv | |
| from datetime import datetime | |
| from textwrap import dedent | |
| # Core LangChain imports for compatibility | |
| from langchain_core.tools import tool | |
| from langchain_core.messages import SystemMessage, HumanMessage, AIMessage | |
| from langgraph.graph import StateGraph, END | |
| from langgraph.checkpoint.memory import MemorySaver | |
| # Agno imports for open-source models + NVIDIA | |
| try: | |
| from agno.agent import Agent | |
| from agno.models.groq import Groq | |
| from agno.models.ollama import Ollama | |
| from agno.models.together import Together | |
| from agno.models.anyscale import Anyscale | |
| from agno.models.huggingface import HuggingFaceChat | |
| from agno.models.nvidia import Nvidia # NVIDIA NIM integration | |
| from agno.tools.duckduckgo import DuckDuckGoTools | |
| from agno.tools.wikipedia import WikipediaTools | |
| from agno.tools.calculator import Calculator | |
| from agno.tools.reasoning import ReasoningTools | |
| from agno.memory import AgentMemory | |
| from agno.storage import AgentStorage | |
| from agno.knowledge import AgentKnowledge | |
| AGNO_AVAILABLE = True | |
| except ImportError: | |
| AGNO_AVAILABLE = False | |
| print("Agno not available. Install with: pip install agno") | |
| # Vector database imports | |
| import faiss | |
| import numpy as np | |
| from sentence_transformers import SentenceTransformer | |
| import json | |
| load_dotenv() | |
| # Enhanced system prompt for Agno agents | |
| AGNO_SYSTEM_PROMPT = dedent("""\ | |
| You are a helpful assistant tasked with answering questions using available tools. | |
| You must provide accurate, comprehensive answers based on available information. | |
| Your capabilities include: | |
| - Using search tools to find current information | |
| - Performing mathematical calculations | |
| - Reasoning through complex problems step by step | |
| - Accessing Wikipedia for encyclopedic knowledge | |
| Guidelines: | |
| 1. Use available tools to gather information when needed | |
| 2. Provide precise, factual answers | |
| 3. For numbers: don't use commas or units unless specified | |
| 4. For strings: don't use articles or abbreviations, write digits in plain text | |
| 5. For lists: apply above rules based on element type | |
| 6. Always end with 'FINAL ANSWER: [YOUR ANSWER]' | |
| 7. Be concise but thorough in your reasoning | |
| 8. If you cannot find the answer, state that clearly | |
| """) | |
| # ---- Enhanced Model Manager with NVIDIA Support ---- | |
| class AgnoEnhancedModelManager: | |
| """Manages open-source models + NVIDIA NIM models available through Agno""" | |
| def __init__(self): | |
| self.available_models = {} | |
| self._initialize_all_models() | |
| def _initialize_all_models(self): | |
| """Initialize open-source models + NVIDIA NIM models through Agno""" | |
| if not AGNO_AVAILABLE: | |
| return | |
| # 1. NVIDIA NIM Models (Enterprise-grade open-source models) | |
| if os.getenv("NVIDIA_API_KEY"): | |
| try: | |
| # NVIDIA NIM provides access to optimized open-source models | |
| self.available_models['nvidia_llama3_70b'] = Nvidia(id="meta/llama3-70b-instruct") | |
| self.available_models['nvidia_llama3_8b'] = Nvidia(id="meta/llama3-8b-instruct") | |
| self.available_models['nvidia_mixtral'] = Nvidia(id="mistralai/mixtral-8x7b-instruct-v0.1") | |
| self.available_models['nvidia_codellama'] = Nvidia(id="meta/codellama-70b-instruct") | |
| self.available_models['nvidia_gemma'] = Nvidia(id="google/gemma-7b-it") | |
| self.available_models['nvidia_yi'] = Nvidia(id="01-ai/yi-34b-chat") | |
| print("NVIDIA NIM models initialized") | |
| except Exception as e: | |
| print(f"NVIDIA models not available: {e}") | |
| # 2. Groq (Free tier with open-source models) | |
| if os.getenv("GROQ_API_KEY"): | |
| try: | |
| self.available_models['groq_llama3_70b'] = Groq(id="llama3-70b-8192") | |
| self.available_models['groq_llama3_8b'] = Groq(id="llama3-8b-8192") | |
| self.available_models['groq_mixtral'] = Groq(id="mixtral-8x7b-32768") | |
| self.available_models['groq_gemma'] = Groq(id="gemma-7b-it") | |
| print("Groq open-source models initialized") | |
| except Exception as e: | |
| print(f"Groq models not available: {e}") | |
| # 3. Ollama (Completely free local models) | |
| try: | |
| self.available_models['ollama_llama3'] = Ollama(id="llama3") | |
| self.available_models['ollama_llama3_70b'] = Ollama(id="llama3:70b") | |
| self.available_models['ollama_mistral'] = Ollama(id="mistral") | |
| self.available_models['ollama_phi3'] = Ollama(id="phi3") | |
| self.available_models['ollama_codellama'] = Ollama(id="codellama") | |
| self.available_models['ollama_gemma'] = Ollama(id="gemma") | |
| self.available_models['ollama_qwen'] = Ollama(id="qwen") | |
| print("Ollama local models initialized") | |
| except Exception as e: | |
| print(f"Ollama models not available: {e}") | |
| # 4. Together AI (Open-source models) | |
| if os.getenv("TOGETHER_API_KEY"): | |
| try: | |
| self.available_models['together_llama3_70b'] = Together(id="meta-llama/Llama-3-70b-chat-hf") | |
| self.available_models['together_llama3_8b'] = Together(id="meta-llama/Llama-3-8b-chat-hf") | |
| self.available_models['together_mistral'] = Together(id="mistralai/Mistral-7B-Instruct-v0.1") | |
| self.available_models['together_qwen'] = Together(id="Qwen/Qwen2-72B-Instruct") | |
| print("Together AI open-source models initialized") | |
| except Exception as e: | |
| print(f"Together AI models not available: {e}") | |
| # 5. Anyscale (Open-source models) | |
| if os.getenv("ANYSCALE_API_KEY"): | |
| try: | |
| self.available_models['anyscale_llama3_70b'] = Anyscale(id="meta-llama/Llama-3-70b-chat-hf") | |
| self.available_models['anyscale_mistral'] = Anyscale(id="mistralai/Mistral-7B-Instruct-v0.1") | |
| print("Anyscale open-source models initialized") | |
| except Exception as e: | |
| print(f"Anyscale models not available: {e}") | |
| # 6. Hugging Face (Open-source models) | |
| try: | |
| if os.getenv("HUGGINGFACE_API_KEY"): | |
| self.available_models['hf_llama3_8b'] = HuggingFaceChat(id="meta-llama/Meta-Llama-3-8B-Instruct") | |
| self.available_models['hf_mistral'] = HuggingFaceChat(id="mistralai/Mistral-7B-Instruct-v0.1") | |
| print("Hugging Face open-source models initialized") | |
| except Exception as e: | |
| print(f"Hugging Face models not available: {e}") | |
| print(f"Total available models: {len(self.available_models)}") | |
| def get_model(self, model_name: str): | |
| """Get a specific model by name""" | |
| return self.available_models.get(model_name) | |
| def list_available_models(self) -> List[str]: | |
| """List all available model names""" | |
| return list(self.available_models.keys()) | |
| def get_best_model_for_task(self, task_type: str): | |
| """Get the best available model for a specific task type""" | |
| if task_type == "reasoning": | |
| # Prefer larger, more capable models for reasoning | |
| for model_name in ['nvidia_llama3_70b', 'groq_llama3_70b', 'together_llama3_70b', 'anyscale_llama3_70b', 'ollama_llama3_70b']: | |
| if model_name in self.available_models: | |
| return self.available_models[model_name] | |
| elif task_type == "coding": | |
| # Prefer code-specialized models | |
| for model_name in ['nvidia_codellama', 'ollama_codellama', 'nvidia_llama3_70b', 'groq_llama3_70b']: | |
| if model_name in self.available_models: | |
| return self.available_models[model_name] | |
| elif task_type == "fast": | |
| # Prefer fast, smaller models | |
| for model_name in ['groq_llama3_8b', 'nvidia_llama3_8b', 'groq_gemma', 'ollama_phi3', 'hf_llama3_8b']: | |
| if model_name in self.available_models: | |
| return self.available_models[model_name] | |
| elif task_type == "enterprise": | |
| # Prefer NVIDIA NIM for enterprise-grade tasks | |
| for model_name in ['nvidia_llama3_70b', 'nvidia_mixtral', 'nvidia_codellama']: | |
| if model_name in self.available_models: | |
| return self.available_models[model_name] | |
| # Default fallback to first available | |
| if self.available_models: | |
| return list(self.available_models.values())[0] | |
| return None | |
| # ---- Enhanced Specialized Agno Agents with NVIDIA ---- | |
| class AgnoEnhancedAgentSystem: | |
| """System of specialized Agno agents using open-source + NVIDIA models""" | |
| def __init__(self): | |
| self.model_manager = AgnoEnhancedModelManager() | |
| self.agents = {} | |
| self._create_specialized_agents() | |
| def _create_specialized_agents(self): | |
| """Create specialized agents for different tasks using best available models""" | |
| if not AGNO_AVAILABLE: | |
| print("Agno not available, agents cannot be created") | |
| return | |
| # Enterprise Research Agent (NVIDIA preferred) | |
| enterprise_model = self.model_manager.get_best_model_for_task("enterprise") | |
| if enterprise_model: | |
| self.agents['enterprise_research'] = Agent( | |
| model=enterprise_model, | |
| tools=[DuckDuckGoTools(), WikipediaTools(), ReasoningTools()], | |
| description=dedent("""\ | |
| You are an enterprise-grade research specialist with access to optimized models. | |
| Your expertise lies in comprehensive analysis, fact-checking, and providing | |
| detailed, accurate responses for complex research tasks. | |
| Your approach is: | |
| - Enterprise-level accuracy and reliability | |
| - Comprehensive and thorough analysis | |
| - Multi-source verification | |
| - Professional-grade output quality | |
| """), | |
| instructions=dedent("""\ | |
| 1. Use advanced reasoning capabilities for complex analysis | |
| 2. Cross-reference multiple sources for maximum accuracy | |
| 3. Provide comprehensive, well-structured responses | |
| 4. Include confidence levels and source reliability assessment | |
| 5. Always end with 'FINAL ANSWER: [your comprehensive answer]' | |
| 6. Prioritize accuracy and completeness over speed | |
| """), | |
| memory=AgentMemory(), | |
| markdown=True, | |
| show_tool_calls=True, | |
| add_datetime_to_instructions=True | |
| ) | |
| # Advanced Math Agent (Best reasoning model) | |
| math_model = self.model_manager.get_best_model_for_task("reasoning") | |
| if math_model: | |
| self.agents['advanced_math'] = Agent( | |
| model=math_model, | |
| tools=[Calculator(), ReasoningTools()], | |
| description=dedent("""\ | |
| You are an advanced mathematics expert with access to powerful reasoning models. | |
| You excel at complex mathematical problem solving, statistical analysis, | |
| and providing step-by-step solutions with high accuracy. | |
| Your approach is: | |
| - Rigorous mathematical methodology | |
| - Step-by-step problem decomposition | |
| - High-precision calculations | |
| - Clear mathematical communication | |
| """), | |
| instructions=dedent("""\ | |
| 1. Break down complex mathematical problems systematically | |
| 2. Use advanced reasoning for multi-step problems | |
| 3. Show detailed work and methodology | |
| 4. Verify calculations using multiple approaches when possible | |
| 5. Provide exact numerical answers without commas or units unless specified | |
| 6. Always end with 'FINAL ANSWER: [precise numerical result]' | |
| """), | |
| memory=AgentMemory(), | |
| markdown=True, | |
| show_tool_calls=True | |
| ) | |
| # Fast Response Agent (Optimized for speed) | |
| fast_model = self.model_manager.get_best_model_for_task("fast") | |
| if fast_model: | |
| self.agents['fast_response'] = Agent( | |
| model=fast_model, | |
| tools=[DuckDuckGoTools(), WikipediaTools()], | |
| description=dedent("""\ | |
| You are a rapid response specialist optimized for quick, accurate answers. | |
| You provide concise, direct responses while maintaining high quality standards. | |
| Your approach is: | |
| - Speed-optimized processing | |
| - Direct and concise communication | |
| - Efficient tool usage | |
| - Quality maintained at high speed | |
| """), | |
| instructions=dedent("""\ | |
| 1. Provide quick, accurate answers | |
| 2. Use tools efficiently - only when necessary | |
| 3. Be direct and avoid unnecessary elaboration | |
| 4. Maintain accuracy despite speed focus | |
| 5. Always end with 'FINAL ANSWER: [your concise answer]' | |
| 6. Prioritize clarity and correctness | |
| """), | |
| markdown=True, | |
| show_tool_calls=False | |
| ) | |
| # Advanced Coding Agent (Code-specialized model) | |
| coding_model = self.model_manager.get_best_model_for_task("coding") | |
| if coding_model: | |
| self.agents['advanced_coding'] = Agent( | |
| model=coding_model, | |
| tools=[ReasoningTools()], | |
| description=dedent("""\ | |
| You are an advanced programming expert with access to code-specialized models. | |
| You excel at complex code generation, algorithm design, debugging, and | |
| software architecture recommendations. | |
| Your approach is: | |
| - Advanced programming methodologies | |
| - Clean, efficient code generation | |
| - Comprehensive error handling | |
| - Best practices implementation | |
| """), | |
| instructions=dedent("""\ | |
| 1. Write production-quality, well-documented code | |
| 2. Follow industry best practices and design patterns | |
| 3. Include comprehensive error handling and edge cases | |
| 4. Provide clear explanations of code logic | |
| 5. Consider performance, security, and maintainability | |
| 6. Always end with 'FINAL ANSWER: [your code solution]' | |
| """), | |
| memory=AgentMemory(), | |
| markdown=True, | |
| show_tool_calls=True | |
| ) | |
| # Standard Research Agent (Fallback) | |
| research_model = self.model_manager.get_best_model_for_task("reasoning") | |
| if research_model and 'enterprise_research' not in self.agents: | |
| self.agents['research'] = Agent( | |
| model=research_model, | |
| tools=[DuckDuckGoTools(), WikipediaTools(), ReasoningTools()], | |
| description=dedent("""\ | |
| You are a research specialist with expertise in finding and analyzing information. | |
| Your specialty lies in gathering comprehensive data from multiple sources. | |
| """), | |
| instructions=dedent("""\ | |
| 1. Use search tools to find current and relevant information | |
| 2. Apply systematic reasoning to analyze findings | |
| 3. Provide comprehensive answers with sources | |
| 4. Always end with 'FINAL ANSWER: [your answer]' | |
| """), | |
| memory=AgentMemory(), | |
| markdown=True, | |
| show_tool_calls=True | |
| ) | |
| print(f"Created {len(self.agents)} specialized Agno agents with enhanced models") | |
| def route_query(self, query: str) -> str: | |
| """Route query to the most appropriate agent""" | |
| q_lower = query.lower() | |
| # Route to specialized agents | |
| if any(keyword in q_lower for keyword in ["calculate", "math", "multiply", "add", "subtract", "divide", "compute", "statistical"]): | |
| if 'advanced_math' in self.agents: | |
| return self._query_agent('advanced_math', query) | |
| elif 'math' in self.agents: | |
| return self._query_agent('math', query) | |
| elif any(keyword in q_lower for keyword in ["code", "programming", "function", "algorithm", "python", "javascript", "debug"]): | |
| if 'advanced_coding' in self.agents: | |
| return self._query_agent('advanced_coding', query) | |
| elif 'coding' in self.agents: | |
| return self._query_agent('coding', query) | |
| elif any(keyword in q_lower for keyword in ["enterprise", "analysis", "comprehensive", "detailed", "professional"]): | |
| if 'enterprise_research' in self.agents: | |
| return self._query_agent('enterprise_research', query) | |
| elif any(keyword in q_lower for keyword in ["research", "find", "search", "information", "study", "analyze"]): | |
| if 'enterprise_research' in self.agents: | |
| return self._query_agent('enterprise_research', query) | |
| elif 'research' in self.agents: | |
| return self._query_agent('research', query) | |
| elif len(query.split()) < 10: # Simple queries | |
| if 'fast_response' in self.agents: | |
| return self._query_agent('fast_response', query) | |
| elif 'fast' in self.agents: | |
| return self._query_agent('fast', query) | |
| # Default to best available agent | |
| if 'enterprise_research' in self.agents: | |
| return self._query_agent('enterprise_research', query) | |
| elif 'research' in self.agents: | |
| return self._query_agent('research', query) | |
| elif self.agents: | |
| agent_name = list(self.agents.keys())[0] | |
| return self._query_agent(agent_name, query) | |
| return "No agents available" | |
| def _query_agent(self, agent_name: str, query: str) -> str: | |
| """Query a specific agent""" | |
| try: | |
| agent = self.agents[agent_name] | |
| response = agent.run(query) | |
| # Extract final answer if present | |
| if "FINAL ANSWER:" in response: | |
| return response.split("FINAL ANSWER:")[-1].strip() | |
| return response.strip() | |
| except Exception as e: | |
| return f"Error with {agent_name} agent: {e}" | |
| def get_system_info(self) -> Dict[str, Any]: | |
| """Get information about available agents and models""" | |
| model_breakdown = { | |
| "nvidia_models": [m for m in self.model_manager.list_available_models() if m.startswith("nvidia_")], | |
| "groq_models": [m for m in self.model_manager.list_available_models() if m.startswith("groq_")], | |
| "ollama_models": [m for m in self.model_manager.list_available_models() if m.startswith("ollama_")], | |
| "together_models": [m for m in self.model_manager.list_available_models() if m.startswith("together_")], | |
| "anyscale_models": [m for m in self.model_manager.list_available_models() if m.startswith("anyscale_")], | |
| "hf_models": [m for m in self.model_manager.list_available_models() if m.startswith("hf_")] | |
| } | |
| return { | |
| "available_models": self.model_manager.list_available_models(), | |
| "model_breakdown": model_breakdown, | |
| "active_agents": list(self.agents.keys()), | |
| "agno_available": AGNO_AVAILABLE, | |
| "total_models": len(self.model_manager.available_models), | |
| "nvidia_available": len(model_breakdown["nvidia_models"]) > 0 | |
| } | |
| # ---- Enhanced Agent State for LangGraph compatibility ---- | |
| class EnhancedAgentState(TypedDict): | |
| """State structure for compatibility with existing system.""" | |
| messages: Annotated[List[HumanMessage | AIMessage], operator.add] | |
| query: str | |
| agent_type: str | |
| final_answer: str | |
| perf: Dict[str, Any] | |
| tools_used: List[str] | |
| reasoning: str | |
| model_used: str | |
| # ---- Unified System with Enhanced NVIDIA Integration ---- | |
| class UnifiedAgnoEnhancedSystem: | |
| """Unified system that integrates Agno agents with NVIDIA + open-source models""" | |
| def __init__(self): | |
| if AGNO_AVAILABLE: | |
| print("Using enhanced Agno-based system with NVIDIA + open-source models") | |
| self.agno_system = AgnoEnhancedAgentSystem() | |
| self.graph = self._build_compatibility_graph() | |
| else: | |
| print("Agno not available") | |
| self.agno_system = None | |
| self.graph = None | |
| def _build_compatibility_graph(self): | |
| """Build LangGraph for compatibility with existing app.py""" | |
| def process_node(state: EnhancedAgentState) -> EnhancedAgentState: | |
| """Process query through enhanced Agno system""" | |
| query = state.get("query", "") | |
| if self.agno_system: | |
| answer = self.agno_system.route_query(query) | |
| return {**state, "final_answer": answer} | |
| else: | |
| return {**state, "final_answer": "Enhanced Agno system not available"} | |
| g = StateGraph(EnhancedAgentState) | |
| g.add_node("process", process_node) | |
| g.set_entry_point("process") | |
| g.add_edge("process", END) | |
| return g.compile(checkpointer=MemorySaver()) | |
| def process_query(self, query: str) -> str: | |
| """Process query through the unified enhanced system""" | |
| if self.agno_system: | |
| return self.agno_system.route_query(query) | |
| else: | |
| return "Enhanced Agno system not available" | |
| def get_system_info(self) -> Dict[str, Any]: | |
| """Get information about the current enhanced system""" | |
| if self.agno_system: | |
| return self.agno_system.get_system_info() | |
| else: | |
| return {"system": "agno_unavailable", "agno_available": False} | |
| # ---- Build Graph Function (for compatibility) ---- | |
| def build_graph(provider: str = "agno_enhanced"): | |
| """Build graph using enhanced Agno models including NVIDIA""" | |
| system = UnifiedAgnoEnhancedSystem() | |
| return system.graph if system.graph else None | |
| # ---- Main execution ---- | |
| if __name__ == "__main__": | |
| # Initialize the enhanced unified system | |
| system = UnifiedAgnoEnhancedSystem() | |
| # Print system information | |
| info = system.get_system_info() | |
| print("Enhanced Agno System Information:") | |
| for key, value in info.items(): | |
| if isinstance(value, dict): | |
| print(f" {key}:") | |
| for subkey, subvalue in value.items(): | |
| print(f" {subkey}: {subvalue}") | |
| else: | |
| print(f" {key}: {value}") | |
| # Test queries | |
| test_questions = [ | |
| "Enterprise analysis: What is 25 multiplied by 17?", | |
| "Research the latest developments in quantum computing", | |
| "Write an advanced Python function to calculate factorial with error handling", | |
| "Find comprehensive information about Mercedes Sosa albums between 2000-2009", | |
| "Quick answer: What is the capital of France?" | |
| ] | |
| print("\n" + "="*60) | |
| print("Testing Enhanced Agno Multi-LLM System with NVIDIA") | |
| print("="*60) | |
| for i, question in enumerate(test_questions, 1): | |
| print(f"\nQuestion {i}: {question}") | |
| print("-" * 50) | |
| answer = system.process_query(question) | |
| print(f"Answer: {answer}") | |