Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Enhanced MCP Client with LLM-based task decomposition, intelligent agent routing, and real MCP protocol. | |
| This client uses AI for smart query analysis and agent coordination instead of hard-coded rules. | |
| """ | |
| import gradio as gr | |
| import asyncio | |
| import json | |
| import os | |
| import sys | |
| import argparse | |
| from typing import Dict, List, Any, Optional | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| import uuid | |
| from pathlib import Path | |
| import requests | |
| from smolagents import MCPClient, LiteLLMModel | |
| ANTHROPIC_API_KEY = os.environ.get('ANTHROPIC_API_KEY') | |
| def setup_environment(): | |
| """Set up environment variables and configuration.""" | |
| global ANTHROPIC_API_KEY | |
| # Validate API keys | |
| print("\nπ API Configuration:") | |
| print(f"Anthropic API Key: {'β Configured' if ANTHROPIC_API_KEY else 'β Missing'}") | |
| if not ANTHROPIC_API_KEY: | |
| print("β οΈ Warning: ANTHROPIC_API_KEY not found in environment") | |
| print("π‘ Set environment variable: ANTHROPIC_API_KEY=your_anthropic_key") | |
| class TaskType(Enum): | |
| """Types of tasks that can be decomposed.""" | |
| SENTIMENT_ANALYSIS = "sentiment_analysis" | |
| LOCATION_SEARCH = "place_search" | |
| RESTAURANT_SEARCH = "restaurant_search" | |
| HIKING_SEARCH = "hiking_search" | |
| WEB_SEARCH = "web_search" | |
| COMPLEX_QUERY = "complex_query" | |
| class SubTask: | |
| """Represents a sub-atomic task.""" | |
| id: str | |
| task_type: TaskType | |
| description: str | |
| parameters: Dict[str, Any] | |
| agent_id: str | |
| confidence: float = 0.5 | |
| status: str = "pending" | |
| result: Optional[Dict[str, Any]] = None | |
| class Agent: | |
| """Represents a dedicated agent for handling specific tools.""" | |
| id: str | |
| name: str | |
| tool_name: str | |
| description: str | |
| capabilities: List[str] | |
| keywords: List[str] | |
| class LLMTaskDecomposer: | |
| """LLM-powered task decomposer using system prompts for intelligent query analysis.""" | |
| def __init__(self, model_name: str = "anthropic"): | |
| """Initialize with support for multiple LLM providers.""" | |
| self.model_name = model_name.lower() | |
| self.model = None | |
| # Initialize the model based on selection | |
| self._initialize_model() | |
| self.agents = self._initialize_agents() | |
| def _initialize_model(self): | |
| """Initialize the selected model with proper error handling.""" | |
| try: | |
| if self.model_name == "anthropic": | |
| if not ANTHROPIC_API_KEY: | |
| print("β ANTHROPIC_API_KEY environment variable is required for Anthropic model") | |
| print("π‘ Model will fall back to keyword-based decomposition") | |
| self.model = None | |
| return | |
| print(f"π§ Initializing Anthropic model...") | |
| self.model = LiteLLMModel( | |
| model_id="anthropic/claude-sonnet-4-20250514", | |
| temperature=0.2, | |
| api_key=ANTHROPIC_API_KEY | |
| ) | |
| # Test the model with a simple call | |
| try: | |
| test_response = self.model([{"role": "user", "content": "Hello"}]) | |
| print(f"β Anthropic model initialized and tested successfully") | |
| print(f"π§ Model response test: {str(test_response)[:50]}...") | |
| except Exception as test_error: | |
| print(f"β οΈ Model initialized but test call failed: {test_error}") | |
| print(f"π Will attempt to use model anyway, with fallback to keywords") | |
| else: | |
| print(f"β Unknown model name: {self.model_name}") | |
| self.model = None | |
| except Exception as e: | |
| print(f"β Model initialization failed: {e}") | |
| print(f"π Falling back to keyword-based decomposition") | |
| self.model = None | |
| def get_model_info(self) -> Dict[str, str]: | |
| """Get information about the current model.""" | |
| if self.model_name == "anthropic": | |
| return { | |
| "name": "Claude Sonnet 4", | |
| "provider": "Anthropic", | |
| "emoji": "π€", | |
| "model_id": "anthropic/claude-sonnet-4-20250514", | |
| "status": "initialized" if self.model else "failed" | |
| } | |
| else: | |
| return { | |
| "name": "Unknown Model", | |
| "provider": "Unknown", | |
| "emoji": "β", | |
| "model_id": "unknown", | |
| "status": "failed" | |
| } | |
| def _initialize_agents(self) -> Dict[str, Agent]: | |
| """Initialize specialized agents with their capabilities and keywords.""" | |
| agents = { | |
| "sentiment_agent": Agent( | |
| id="sentiment_agent", | |
| name="Sentiment Analysis Agent", | |
| tool_name="sentiment_analysis", | |
| description="Analyzes text sentiment, emotions, and opinions", | |
| capabilities=["text_analysis", "emotion_detection", "polarity_scoring", "opinion_mining"], | |
| keywords=["sentiment", "feeling", "opinion", "review", "emotion", "mood", "analyze text", "positive", "negative", "happy", "sad", "angry", "excited"] | |
| ), | |
| "location_agent": Agent( | |
| id="location_agent", | |
| name="Location Search Agent", | |
| tool_name="place_search", | |
| description="Finds hotels, accommodations, and places to stay", | |
| capabilities=["place_search", "hotel_finder", "accommodation_search", "lodging_recommendations"], | |
| keywords=["hotel", "hotels", "stay", "accommodation", "lodging", "motel", "resort", "inn", "bed and breakfast", "airbnb", "place to stay"] | |
| ), | |
| "restaurant_agent": Agent( | |
| id="restaurant_agent", | |
| name="Restaurant Search Agent", | |
| tool_name="restaurant_search", | |
| description="Discovers restaurants, food places, and dining options", | |
| capabilities=["restaurant_search", "cuisine_finder", "dining_recommendations", "food_discovery"], | |
| keywords=["restaurant", "restaurants", "food", "dining", "eat", "dinner", "lunch", "breakfast", "cafe", "bar", "cuisine", "meal", "dining out"] | |
| ), | |
| "hiking_agent": Agent( | |
| id="hiking_agent", | |
| name="Hiking Search Agent", | |
| tool_name="hiking_search", | |
| description="Finds hiking trails, outdoor activities, and nature spots", | |
| capabilities=["trail_finder", "outdoor_activities", "difficulty_assessment", "nature_exploration"], | |
| keywords=["hike", "hiking", "trail", "trails", "trek", "trekking", "outdoor", "mountain", "nature", "walk", "walking", "climbing", "adventure"] | |
| ), | |
| "web_agent": Agent( | |
| id="web_agent", | |
| name="Web Search Agent", | |
| tool_name="web_search", | |
| description="Searches web for information, news, weather, finance, and general queries with intelligent ticker detection for financial data", | |
| capabilities=["web_search", "information_retrieval", "real_time_data", "news_search", "weather_data", "financial_data", "ticker_detection"], | |
| keywords=["search", "find", "lookup", "google", "web", "information", "weather", "news", "current", "latest", "what is", "definition", "stock", "price", "market", "finance"] | |
| ) | |
| } | |
| return agents | |
| async def decompose_query(self, user_query: str) -> List[SubTask]: | |
| """ | |
| Use LLM to analyze user query and decompose into actionable subtasks. | |
| """ | |
| print(f"π Decomposing query: '{user_query}'") | |
| try: | |
| # Create decomposition prompt | |
| system_prompt = self._create_decomposition_prompt() | |
| # Prepare the user message | |
| user_message = f"""Query to analyze: "{user_query}" | |
| Please analyze this query and respond with a JSON object containing your analysis.""" | |
| print(f"π§ Attempting LLM decomposition with model: {self.model_name}") | |
| # Use Anthropic model (synchronous) | |
| if self.model_name == "anthropic" and self.model is not None: | |
| # Use LiteLLM model directly (synchronous) | |
| try: | |
| print(f"π‘ Calling LLM model...") | |
| response = self.model([ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_message} | |
| ]) | |
| print(f"β LLM response received: {str(response)[:200]}...") | |
| print(f"π Response type: {type(response)}") | |
| except Exception as e: | |
| print(f"β Model call failed: {e}") | |
| print(f"π Falling back to keyword-based decomposition") | |
| return self._fallback_decomposition(user_query) | |
| else: | |
| print(f"β Model not available ({self.model_name}), using fallback") | |
| return self._fallback_decomposition(user_query) | |
| # Parse LLM response | |
| print(f"π Parsing LLM response...") | |
| analysis = self._parse_llm_response(response, user_query) | |
| print(f"π Analysis result: {analysis}") | |
| # Convert analysis to subtasks | |
| print(f"π― Creating subtasks from analysis...") | |
| subtasks = self._create_subtasks(analysis, user_query) | |
| print(f"β Generated {len(subtasks)} subtasks") | |
| if not subtasks: | |
| print("β οΈ No subtasks generated, using fallback") | |
| return self._fallback_decomposition(user_query) | |
| # Debug: print subtask details | |
| for i, subtask in enumerate(subtasks): | |
| print(f" π Subtask {i+1}: {subtask.agent_id} -> {subtask.description}") | |
| return subtasks | |
| except Exception as e: | |
| print(f"β Task decomposition failed: {e}") | |
| print(f"π Using fallback decomposition") | |
| return self._fallback_decomposition(user_query) | |
| def _create_decomposition_prompt(self) -> str: | |
| """Create comprehensive system prompt for task decomposition.""" | |
| agent_descriptions = [] | |
| for agent_id, agent in self.agents.items(): | |
| agent_descriptions.append(f""" | |
| **{agent.name}** ({agent_id}): | |
| - Description: {agent.description} | |
| - Tool: {agent.tool_name} | |
| - Keywords: {', '.join(agent.keywords[:10])} | |
| - Capabilities: {', '.join(agent.capabilities)} | |
| """) | |
| return f"""You are an intelligent task decomposer for a multi-agent system. Your job is to analyze user queries and route them to the most appropriate specialized agents. | |
| AVAILABLE AGENTS: | |
| {chr(10).join(agent_descriptions)} | |
| TASK DECOMPOSITION RULES: | |
| 1. **Analyze Intent**: Identify the primary purpose of the user's query | |
| 2. **Extract Entities**: Find locations, keywords, parameters, and specific requirements | |
| 3. **Route Intelligently**: Choose the most appropriate agent(s) based on intent and entities | |
| 4. **Handle Complex Queries**: Break down multi-intent queries into separate tasks | |
| 5. **Provide Fallbacks**: Use web_agent for ambiguous or unsupported queries | |
| RESPONSE FORMAT: | |
| Always respond with valid JSON in this exact format: | |
| {{ | |
| "analysis": {{ | |
| "query_type": "simple|complex|ambiguous", | |
| "primary_intent": "brief description of main intent", | |
| "complexity_score": 0.0-1.0, | |
| "location_extracted": "location if found or null", | |
| "entities": ["entity1", "entity2"], | |
| "reasoning": "brief explanation of your analysis" | |
| }}, | |
| "tasks": [ | |
| {{ | |
| "task_id": "unique_id", | |
| "agent_id": "agent_name", | |
| "description": "clear task description", | |
| "parameters": {{"param1": "value1"}}, | |
| "confidence": 0.0-1.0, | |
| "priority": 1-5 | |
| }} | |
| ] | |
| }} | |
| TOOL PARAMETER SPECIFICATIONS: | |
| - **web_search**: {{"query": "search_terms", "max_results": 5}} | |
| - **sentiment_analysis**: {{"text": "text_to_analyze"}} | |
| - **place_search**: {{"query": "location", "max_distance": 20}} | |
| - **restaurant_search**: {{"query": "location", "cuisine": "cuisine_type_or_null"}} | |
| - **hiking_search**: {{"location": "location", "difficulty": "easy|moderate|hard|null", "max_distance": 30}} | |
| COMPREHENSIVE EXAMPLES: | |
| **Financial/Stock Queries (Enhanced with Ticker Detection):** | |
| Query: "What's NVIDIA's current stock price?" | |
| {{ | |
| "analysis": {{"query_type": "simple", "primary_intent": "get financial data", "complexity_score": 0.3, "location_extracted": null, "entities": ["NVIDIA", "stock price"], "reasoning": "Financial query for real-time stock data - ticker detection will enhance this"}}, | |
| "tasks": [{{"task_id": "web_001", "agent_id": "web_agent", "description": "Get current NVIDIA stock price with intelligent ticker detection", "parameters": {{"query": "NVIDIA stock price", "max_results": 5}}, "confidence": 0.95, "priority": 1}}] | |
| }} | |
| **General Web Search:** | |
| Query: "Latest news about AI technology" | |
| {{ | |
| "analysis": {{"query_type": "simple", "primary_intent": "search for news", "complexity_score": 0.3, "location_extracted": null, "entities": ["news", "AI", "technology"], "reasoning": "General web search for current information"}}, | |
| "tasks": [{{"task_id": "web_002", "agent_id": "web_agent", "description": "Search for latest AI technology news", "parameters": {{"query": "latest AI technology news", "max_results": 5}}, "confidence": 0.9, "priority": 1}}] | |
| }} | |
| **Hiking/Outdoor Queries:** | |
| Query: "Find moderate hiking trails near Seattle within 30 miles" | |
| {{ | |
| "analysis": {{"query_type": "simple", "primary_intent": "find hiking trails", "complexity_score": 0.4, "location_extracted": "Seattle", "entities": ["hiking", "trails", "moderate", "Seattle", "30 miles"], "reasoning": "Outdoor activity search with specific location and difficulty"}}, | |
| "tasks": [{{"task_id": "hiking_001", "agent_id": "hiking_agent", "description": "Find moderate hiking trails near Seattle", "parameters": {{"location": "Seattle", "difficulty": "moderate", "max_distance": 30}}, "confidence": 0.95, "priority": 1}}] | |
| }} | |
| **Hotel/Accommodation Queries:** | |
| Query: "Best luxury hotels in Paris near Eiffel Tower" | |
| {{ | |
| "analysis": {{"query_type": "simple", "primary_intent": "find accommodation", "complexity_score": 0.4, "location_extracted": "Paris", "entities": ["hotels", "luxury", "Paris", "Eiffel Tower"], "reasoning": "Accommodation search with location and luxury preference"}}, | |
| "tasks": [{{"task_id": "place_001", "agent_id": "location_agent", "description": "Find luxury hotels in Paris near Eiffel Tower", "parameters": {{"query": "luxury hotels Paris near Eiffel Tower", "max_distance": 20}}, "confidence": 0.9, "priority": 1}}] | |
| }} | |
| **Restaurant/Food Queries:** | |
| Query: "Italian restaurants in New York with outdoor seating" | |
| {{ | |
| "analysis": {{"query_type": "simple", "primary_intent": "find restaurants", "complexity_score": 0.4, "location_extracted": "New York", "entities": ["Italian", "restaurants", "New York", "outdoor seating"], "reasoning": "Restaurant search with cuisine and location preferences"}}, | |
| "tasks": [{{"task_id": "rest_001", "agent_id": "restaurant_agent", "description": "Find Italian restaurants in New York with outdoor seating", "parameters": {{"query": "New York", "cuisine": "Italian"}}, "confidence": 0.9, "priority": 1}}] | |
| }} | |
| **Sentiment Analysis Queries:** | |
| Query: "Analyze sentiment: 'This product is amazing and exceeded my expectations!'" | |
| {{ | |
| "analysis": {{"query_type": "simple", "primary_intent": "analyze text sentiment", "complexity_score": 0.2, "location_extracted": null, "entities": ["sentiment", "text analysis"], "reasoning": "Clear sentiment analysis request with provided text"}}, | |
| "tasks": [{{"task_id": "sent_001", "agent_id": "sentiment_agent", "description": "Analyze sentiment of product review", "parameters": {{"text": "This product is amazing and exceeded my expectations!"}}, "confidence": 0.95, "priority": 1}}] | |
| }} | |
| **Complex Multi-Intent Queries:** | |
| Query: "I'm planning a trip to Tokyo - need hotels and restaurants" | |
| {{ | |
| "analysis": {{"query_type": "complex", "primary_intent": "travel planning with accommodation and dining", "complexity_score": 0.7, "location_extracted": "Tokyo", "entities": ["trip", "Tokyo", "hotels", "restaurants"], "reasoning": "Multi-intent travel query requiring both accommodation and restaurant search"}}, | |
| "tasks": [ | |
| {{"task_id": "place_001", "agent_id": "location_agent", "description": "Find hotels in Tokyo", "parameters": {{"query": "Tokyo", "max_distance": 20}}, "confidence": 0.9, "priority": 1}}, | |
| {{"task_id": "rest_001", "agent_id": "restaurant_agent", "description": "Find restaurants in Tokyo", "parameters": {{"query": "Tokyo", "cuisine": null}}, "confidence": 0.9, "priority": 1}} | |
| ] | |
| }} | |
| **Weather/News/General Web Queries:** | |
| Query: "Latest news about artificial intelligence developments" | |
| {{ | |
| "analysis": {{"query_type": "simple", "primary_intent": "get current news information", "complexity_score": 0.3, "location_extracted": null, "entities": ["news", "artificial intelligence"], "reasoning": "Information retrieval query requiring web search"}}, | |
| "tasks": [{{"task_id": "web_001", "agent_id": "web_agent", "description": "Get latest AI news", "parameters": {{"query": "latest news artificial intelligence developments"}}, "confidence": 0.9, "priority": 1}}] | |
| }} | |
| **Ambiguous Queries:** | |
| Query: "Tell me about Paris" | |
| {{ | |
| "analysis": {{"query_type": "ambiguous", "primary_intent": "get general information", "complexity_score": 0.5, "location_extracted": "Paris", "entities": ["Paris"], "reasoning": "Vague query - could be travel, history, or general info - use web search"}}, | |
| "tasks": [{{"task_id": "web_001", "agent_id": "web_agent", "description": "Get general information about Paris", "parameters": {{"query": "Paris information travel guide"}}, "confidence": 0.7, "priority": 1}}] | |
| }} | |
| INTELLIGENT ROUTING GUIDELINES: | |
| - **Keywords for hiking_agent**: hiking, trails, trek, outdoor, mountain, nature, walk, climbing, adventure | |
| - **Keywords for location_agent**: hotel, hotels, accommodation, stay, lodging, motel, resort, inn | |
| - **Keywords for restaurant_agent**: restaurant, food, dining, eat, cuisine, meal, cafe, bar | |
| - **Keywords for sentiment_agent**: sentiment, analyze, opinion, feeling, emotion, review, mood | |
| - **Keywords for web_agent**: news, weather, stock, price, latest, current, information, what is | |
| PARAMETER EXTRACTION RULES: | |
| - **Locations**: Look for city names, landmarks, "in", "near", "at", "around" | |
| - **Difficulties**: easy, moderate, hard, difficult, challenging, extreme | |
| - **Distances**: "within X miles", "X km radius", "close to" | |
| - **Cuisines**: Italian, Chinese, Mexican, etc. | |
| - **Accommodations**: luxury, budget, boutique, business, etc. | |
| IMPORTANT: | |
| - Always provide valid JSON | |
| - Use exact agent_id values from the list above | |
| - Extract locations and parameters accurately | |
| - Assign appropriate confidence scores based on query clarity | |
| - For unclear queries, use web_agent as fallback | |
| - Be specific in task descriptions and reasoning""" | |
| def _parse_llm_response(self, response, original_query: str) -> Dict[str, Any]: | |
| """Parse LLM response and extract structured analysis.""" | |
| try: | |
| # Handle different response types | |
| if hasattr(response, 'content'): | |
| # ChatMessage object - extract content | |
| response_text = response.content | |
| elif hasattr(response, 'text'): | |
| # Some other response object with text attribute | |
| response_text = response.text | |
| elif isinstance(response, str): | |
| # Already a string | |
| response_text = response | |
| else: | |
| # Try to convert to string | |
| response_text = str(response) | |
| print(f"π Raw response text: {response_text[:500]}...") | |
| # Clean up markdown code blocks if present | |
| import re | |
| # Remove markdown code block markers | |
| response_text = re.sub(r'```json\s*', '', response_text) | |
| response_text = re.sub(r'```\s*$', '', response_text) | |
| response_text = response_text.strip() | |
| # Try to extract JSON from the response - more robust pattern | |
| json_patterns = [ | |
| r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', # Simple nested braces | |
| r'\{.*\}', # Original fallback pattern | |
| ] | |
| analysis = None | |
| # First try: Direct JSON parsing if the response looks like pure JSON | |
| if response_text.strip().startswith('{') and response_text.strip().endswith('}'): | |
| try: | |
| analysis = json.loads(response_text.strip()) | |
| print(f"β Successfully parsed JSON via direct parsing") | |
| except json.JSONDecodeError: | |
| print(f"β οΈ Direct JSON parsing failed, trying pattern matching") | |
| # Second try: Pattern matching | |
| if not analysis: | |
| for pattern in json_patterns: | |
| json_match = re.search(pattern, response_text, re.DOTALL) | |
| if json_match: | |
| try: | |
| json_text = json_match.group().strip() | |
| print(f"π Extracted JSON: {json_text[:200]}...") | |
| analysis = json.loads(json_text) | |
| print(f"β Successfully parsed JSON analysis via pattern matching") | |
| break | |
| except json.JSONDecodeError as json_error: | |
| print(f"β JSON decode error with pattern {pattern}: {json_error}") | |
| continue | |
| # Third try: Find balanced braces manually | |
| if not analysis: | |
| try: | |
| analysis = self._extract_json_with_balanced_braces(response_text) | |
| if analysis: | |
| print(f"β Successfully parsed JSON via balanced brace extraction") | |
| except Exception as brace_error: | |
| print(f"β Balanced brace extraction failed: {brace_error}") | |
| if analysis: | |
| return analysis | |
| else: | |
| raise ValueError("No valid JSON found in response") | |
| except Exception as e: | |
| print(f"β Error parsing LLM response: {e}") | |
| print(f"π Falling back to keyword-based analysis") | |
| # Return fallback analysis | |
| return { | |
| "analysis": { | |
| "query_type": "simple", | |
| "primary_intent": "general query", | |
| "complexity_score": 0.5, | |
| "location_extracted": None, | |
| "entities": [], | |
| "reasoning": f"LLM parsing failed: {str(e)}" | |
| }, | |
| "tasks": [ | |
| { | |
| "task_id": "web_fallback", | |
| "agent_id": "web_agent", | |
| "description": original_query, | |
| "parameters": {"query": original_query, "category": None}, | |
| "confidence": 0.5, | |
| "priority": 1 | |
| } | |
| ] | |
| } | |
| def _extract_json_with_balanced_braces(self, text: str) -> Optional[Dict[str, Any]]: | |
| """Extract JSON by finding balanced braces manually.""" | |
| import json | |
| # Find the first opening brace | |
| start_idx = text.find('{') | |
| if start_idx == -1: | |
| return None | |
| # Count braces to find the matching closing brace | |
| brace_count = 0 | |
| end_idx = start_idx | |
| in_string = False | |
| escape_next = False | |
| for i, char in enumerate(text[start_idx:], start_idx): | |
| if escape_next: | |
| escape_next = False | |
| continue | |
| if char == '\\': | |
| escape_next = True | |
| continue | |
| if char == '"' and not escape_next: | |
| in_string = not in_string | |
| continue | |
| if not in_string: | |
| if char == '{': | |
| brace_count += 1 | |
| elif char == '}': | |
| brace_count -= 1 | |
| if brace_count == 0: | |
| end_idx = i | |
| break | |
| if brace_count == 0: | |
| json_text = text[start_idx:end_idx + 1] | |
| try: | |
| return json.loads(json_text) | |
| except json.JSONDecodeError: | |
| return None | |
| return None | |
| def _create_subtasks(self, analysis: Dict[str, Any], original_query: str) -> List[SubTask]: | |
| """Convert LLM analysis into SubTask objects.""" | |
| subtasks = [] | |
| tasks = analysis.get("tasks", []) | |
| if not tasks: | |
| # Fallback if no tasks generated | |
| tasks = [{ | |
| "task_id": "fallback_001", | |
| "agent_id": "web_agent", | |
| "description": original_query, | |
| "parameters": {"query": original_query}, | |
| "confidence": 0.5, | |
| "priority": 1 | |
| }] | |
| for task_data in tasks: | |
| agent_id = task_data.get("agent_id", "web_agent") | |
| # Map agent_id to task_type | |
| task_type_mapping = { | |
| "sentiment_agent": TaskType.SENTIMENT_ANALYSIS, | |
| "location_agent": TaskType.LOCATION_SEARCH, | |
| "restaurant_agent": TaskType.RESTAURANT_SEARCH, | |
| "hiking_agent": TaskType.HIKING_SEARCH, | |
| "web_agent": TaskType.WEB_SEARCH | |
| } | |
| task_type = task_type_mapping.get(agent_id, TaskType.WEB_SEARCH) | |
| subtask = SubTask( | |
| id=task_data.get("task_id", str(uuid.uuid4())), | |
| task_type=task_type, | |
| description=task_data.get("description", original_query), | |
| parameters=task_data.get("parameters", {"query": original_query}), | |
| agent_id=agent_id, | |
| confidence=task_data.get("confidence", 0.5) | |
| ) | |
| subtasks.append(subtask) | |
| return subtasks | |
| def _fallback_decomposition(self, user_query: str) -> List[SubTask]: | |
| """Fallback decomposition using simple keyword matching.""" | |
| print(f"π Using fallback decomposition for: '{user_query}'") | |
| query_lower = user_query.lower() | |
| # Simple keyword-based classification | |
| if any(word in query_lower for word in ["sentiment", "feeling", "opinion", "emotion", "analyze"]): | |
| print(f"π Detected sentiment analysis request") | |
| # Extract text to analyze - look for text in quotes or after "analyze sentiment:" | |
| import re | |
| text_to_analyze = user_query | |
| # Try to extract quoted text first | |
| quote_pattern = r"['\"]([^'\"]+)['\"]" | |
| quote_match = re.search(quote_pattern, user_query) | |
| if quote_match: | |
| text_to_analyze = quote_match.group(1) | |
| print(f"π Extracted quoted text: '{text_to_analyze}'") | |
| # Try to extract text after "analyze sentiment:" or similar patterns | |
| elif "analyze sentiment:" in query_lower: | |
| parts = user_query.split(":", 1) | |
| if len(parts) > 1: | |
| text_to_analyze = parts[1].strip().strip("'\"") | |
| print(f"π Extracted text after colon: '{text_to_analyze}'") | |
| # Try to extract text after "sentiment" keyword | |
| elif "sentiment" in query_lower: | |
| # Look for patterns like "sentiment of X" or "analyze X sentiment" | |
| sentiment_patterns = [ | |
| r"sentiment[:\s]+['\"]?([^'\"]+)['\"]?", | |
| r"analyze[:\s]+['\"]?([^'\"]+)['\"]?\s+sentiment", | |
| r"['\"]([^'\"]+)['\"].*sentiment" | |
| ] | |
| for pattern in sentiment_patterns: | |
| match = re.search(pattern, user_query, re.IGNORECASE) | |
| if match: | |
| text_to_analyze = match.group(1).strip() | |
| print(f"π Extracted text via pattern: '{text_to_analyze}'") | |
| break | |
| print(f"π― Final text for sentiment analysis: '{text_to_analyze}'") | |
| return [SubTask( | |
| id=str(uuid.uuid4()), | |
| task_type=TaskType.SENTIMENT_ANALYSIS, | |
| description=f"Analyze sentiment: {text_to_analyze}", | |
| parameters={"text": text_to_analyze}, | |
| agent_id="sentiment_agent", | |
| confidence=0.8 | |
| )] | |
| elif any(word in query_lower for word in ["hiking", "trail", "trails", "trek", "trekking", "hike", "hikes"]): | |
| # Extract location and difficulty for hiking | |
| import re | |
| # Extract location patterns | |
| location_patterns = [ | |
| r"(?:in|at|near|around|close to)\s+([a-zA-Z\s,]+?)(?:\s+within|\s+\d|$|\.|,)", | |
| r"([A-Z][a-zA-Z\s]+?)(?:\s+within|\s+\d|$)" | |
| ] | |
| location = None | |
| for pattern in location_patterns: | |
| location_match = re.search(pattern, user_query) | |
| if location_match: | |
| location = location_match.group(1).strip() | |
| break | |
| if not location: | |
| location = user_query # Fallback to full query | |
| # Extract difficulty | |
| difficulty = None | |
| if "easy" in query_lower: | |
| difficulty = "easy" | |
| elif "moderate" in query_lower: | |
| difficulty = "moderate" | |
| elif any(word in query_lower for word in ["hard", "difficult", "challenging"]): | |
| difficulty = "hard" | |
| elif any(word in query_lower for word in ["very hard", "extreme", "strenuous"]): | |
| difficulty = "very_hard" | |
| # Extract distance | |
| max_distance = 30 # Default | |
| distance_match = re.search(r"within\s+(\d+)\s*(?:mile|miles|mi)", query_lower) | |
| if distance_match: | |
| max_distance = int(distance_match.group(1)) | |
| return [SubTask( | |
| id=str(uuid.uuid4()), | |
| task_type=TaskType.HIKING_SEARCH, | |
| description=f"Find hiking trails: {user_query}", | |
| parameters={"location": location, "difficulty": difficulty, "max_distance": max_distance}, | |
| agent_id="hiking_agent", | |
| confidence=0.8 | |
| )] | |
| elif any(word in query_lower for word in ["hotel", "accommodation", "stay", "place"]): | |
| return [SubTask( | |
| id=str(uuid.uuid4()), | |
| task_type=TaskType.LOCATION_SEARCH, | |
| description=f"Find accommodations: {user_query}", | |
| parameters={"query": user_query, "max_distance": 20}, | |
| agent_id="location_agent", | |
| confidence=0.7 | |
| )] | |
| elif any(word in query_lower for word in ["restaurant", "food", "dining", "eat"]): | |
| return [SubTask( | |
| id=str(uuid.uuid4()), | |
| task_type=TaskType.RESTAURANT_SEARCH, | |
| description=f"Find restaurants: {user_query}", | |
| parameters={"query": user_query, "cuisine": None}, | |
| agent_id="restaurant_agent", | |
| confidence=0.7 | |
| )] | |
| else: | |
| # Default to web search | |
| return [SubTask( | |
| id=str(uuid.uuid4()), | |
| task_type=TaskType.WEB_SEARCH, | |
| description=f"Web search: {user_query}", | |
| parameters={"query": user_query, "category": None}, | |
| agent_id="web_agent", | |
| confidence=0.6 | |
| )] | |
| async def test_ticker_detection(self, test_queries: List[str] = None) -> Dict[str, str]: | |
| """Test ticker detection on various queries to help debug issues.""" | |
| if test_queries is None: | |
| test_queries = [ | |
| "What's the current stock price of NVDA?", | |
| "NVDA stock price", | |
| "Get NVIDIA stock price", | |
| "What is TSLA trading at?", | |
| "Apple stock price", | |
| "AAPL current price" | |
| ] | |
| results = {} | |
| print("π§ͺ Testing ticker detection...") | |
| for query in test_queries: | |
| detected = await self.detect_ticker_symbol(query) | |
| results[query] = detected | |
| print(f" '{query}' β '{detected}'") | |
| return results | |
| async def detect_ticker_symbol(self, user_query: str) -> str: | |
| """ | |
| Use LLM to detect and extract ticker symbols from financial queries. | |
| """ | |
| try: | |
| # First check for obvious ticker symbols in the query | |
| import re | |
| print(f"π Analyzing query for ticker: '{user_query}'") | |
| # Common ticker patterns - improved to catch more cases | |
| ticker_patterns = [ | |
| r'\b([A-Z]{1,5})\b(?:\s+stock|\s+price|\s+quote)', # NVDA stock, AAPL price | |
| r'\bof\s+([A-Z]{2,5})\b', # "price of NVDA" | |
| r'\b([A-Z]{2,5})\s*\??\s*$', # NVDA at end of query | |
| r'\b([A-Z]{2,5})\b(?=\s)', # Standalone uppercase 2-5 letters followed by space | |
| r'\b([A-Z]{2,5})\b', # Any 2-5 letter uppercase sequence | |
| ] | |
| # Known ticker mappings for common companies | |
| company_tickers = { | |
| 'nvidia': 'NVDA', | |
| 'apple': 'AAPL', | |
| 'tesla': 'TSLA', | |
| 'microsoft': 'MSFT', | |
| 'google': 'GOOGL', | |
| 'amazon': 'AMZN', | |
| 'meta': 'META', | |
| 'facebook': 'META', | |
| 'spy': 'SPY', | |
| 'qqq': 'QQQ' | |
| } | |
| query_lower = user_query.lower() | |
| # Check for direct ticker matches first | |
| for i, pattern in enumerate(ticker_patterns): | |
| matches = re.findall(pattern, user_query, re.IGNORECASE) | |
| print(f" Pattern {i+1} ('{pattern}'): {matches}") | |
| for match in matches: | |
| if len(match) >= 2 and match.upper() not in ['THE', 'AND', 'FOR', 'ARE', 'BUT', 'NOT', 'YOU', 'ALL', 'CAN', 'HER', 'WAS', 'ONE', 'OUR', 'HAD', 'BUT', 'WHAT', 'BEEN', 'THAT', 'WITH', 'THIS']: | |
| print(f"π― Direct ticker pattern match found: {match.upper()}") | |
| return match.upper() | |
| # Check for company name matches | |
| for company, ticker in company_tickers.items(): | |
| if company in query_lower: | |
| print(f"π’ Company name match found: {company} β {ticker}") | |
| return ticker | |
| # Use LLM as fallback for complex cases | |
| prompt = f""" | |
| You are a financial assistant. Your task is to identify stock ticker symbols in queries. | |
| Query: "{user_query}" | |
| If this query mentions a company or stock, return ONLY the ticker symbol (e.g., "AAPL", "TSLA", "NVDA"). | |
| If no ticker can be identified, return "UNKNOWN". | |
| Examples: | |
| - "Apple stock price" β AAPL | |
| - "Tesla earnings" β TSLA | |
| - "NVIDIA performance" β NVDA | |
| - "Microsoft news" β MSFT | |
| - "What's the current stock price of NVDA?" β NVDA | |
| - "weather forecast" β UNKNOWN | |
| Response (ticker only):""" | |
| if self.model_name == "anthropic" and self.model: | |
| response = self.model([{"role": "user", "content": prompt}]) | |
| ticker = str(response).strip().upper() | |
| # Validate ticker format | |
| if ticker and ticker != "UNKNOWN" and len(ticker) <= 5 and ticker.isalpha(): | |
| print(f"π€ LLM ticker detection: {ticker}") | |
| return ticker | |
| print(f"β No ticker detected for query: {user_query}") | |
| return "UNKNOWN" | |
| except Exception as e: | |
| print(f"β Ticker detection failed: {e}") | |
| return "UNKNOWN" | |
| async def enhance_financial_query(self, user_query: str) -> str: | |
| """Enhance financial queries with ticker symbol detection.""" | |
| ticker_result = await self.detect_ticker_symbol(user_query) | |
| if ticker_result != "UNKNOWN": | |
| # Specific ticker found - create focused financial query | |
| enhanced_query = f"{ticker_result} stock price quote market data" | |
| print(f"π― Enhanced financial query: '{user_query}' β '{enhanced_query}' (ticker: {ticker_result})") | |
| return enhanced_query | |
| else: | |
| print(f"π Using original query: '{user_query}' (no ticker detected)") | |
| return user_query | |
| class MCPClientManager: | |
| """Enhanced MCP Client Manager with LLM-powered task decomposition.""" | |
| def __init__(self, server_url: str = "http://localhost:7861/gradio_api/mcp/sse", model_name: str = "anthropic"): | |
| self.server_url = server_url | |
| self.model_name = model_name | |
| self.task_decomposer = LLMTaskDecomposer(model_name) | |
| self.mcp_client = None | |
| self.session_id = str(uuid.uuid4()) | |
| self.is_connected = False | |
| self.available_tools = [] | |
| async def connect_to_server(self) -> bool: | |
| """Connect to an already running MCP server using smolagents MCPClient.""" | |
| max_retries = 3 if "localhost" in self.server_url else 2 | |
| retry_delay = 2 | |
| for attempt in range(max_retries): | |
| try: | |
| if attempt > 0: | |
| print(f"π Retry attempt {attempt + 1}/{max_retries}") | |
| await asyncio.sleep(retry_delay) | |
| print(f"π Attempting to connect to MCP server at: {self.server_url}") | |
| # Add timeout for remote connections | |
| timeout = 15 if "localhost" in self.server_url else 45 | |
| # Create MCP client using smolagents with explicit transport | |
| self.mcp_client = MCPClient({ | |
| "url": self.server_url, | |
| "transport": "sse", # Explicitly specify SSE transport | |
| "timeout": timeout | |
| }) | |
| # Get available tools with timeout | |
| try: | |
| print("π Fetching available tools...") | |
| self.available_tools = self.mcp_client.get_tools() | |
| tool_names = [tool.name for tool in self.available_tools] | |
| print(f"β Connected to MCP server. Available tools: {tool_names}") | |
| # Debug: Print detailed tool information | |
| if self.available_tools: | |
| print("π Tool Details:") | |
| for tool in self.available_tools: | |
| print(f" β’ {tool.name}") | |
| else: | |
| print("β οΈ Warning: No tools found on the server") | |
| # Check if tools have prefixes and suggest mapping | |
| if tool_names and any("_" in name for name in tool_names): | |
| print("π§ Detected prefixed tool names - using flexible matching") | |
| except Exception as tools_error: | |
| print(f"β οΈ Warning: Connected but failed to get tools: {tools_error}") | |
| self.available_tools = [] | |
| self.is_connected = True | |
| return True | |
| except Exception as e: | |
| error_msg = str(e) | |
| if "timeout" in error_msg.lower() or "connection" in error_msg.lower(): | |
| print(f"β±οΈ Connection attempt {attempt + 1} failed: {error_msg}") | |
| else: | |
| print(f"β Connection attempt {attempt + 1} failed: {error_msg}") | |
| if attempt == max_retries - 1: | |
| print(f"β Failed to connect to MCP server after {max_retries} attempts") | |
| print(f"π‘ Connection troubleshooting for: {self.server_url}") | |
| if "localhost" in self.server_url: | |
| print("π LOCAL SERVER ISSUES:") | |
| print(" β’ Make sure the MCP server is running locally") | |
| print(" β’ Check if port 7861 is available") | |
| print(" β’ Try running: python server.py in the mcp_server directory") | |
| else: | |
| print("π REMOTE SERVER ISSUES:") | |
| if "hf.space" in self.server_url: | |
| print(" β’ The Hugging Face Space might be PRIVATE (not publicly accessible)") | |
| print(" β’ Make the Space PUBLIC in HF settings, or") | |
| print(" β’ Use a local server instead") | |
| print(" β’ Check your internet connection") | |
| print(" β’ Verify the server URL is correct and accessible") | |
| await self._cleanup() | |
| return False | |
| async def execute_subtask(self, subtask: SubTask) -> Dict[str, Any]: | |
| """Execute a subtask using the MCP tool.""" | |
| if not self.is_connected or not self.mcp_client: | |
| return {"error": "Not connected to MCP server"} | |
| try: | |
| agent = self.task_decomposer.agents.get(subtask.agent_id) | |
| if not agent: | |
| return {"error": f"Agent {subtask.agent_id} not found"} | |
| tool_name = agent.tool_name | |
| # Debug: print available tools | |
| available_tool_names = [tool.name for tool in self.available_tools] | |
| print(f"π Looking for tool '{tool_name}' among available tools: {available_tool_names}") | |
| # Find the tool - support both exact matches and suffix matches (for prefixed tools) | |
| tool = None | |
| for available_tool in self.available_tools: | |
| # First try exact match | |
| if available_tool.name == tool_name: | |
| tool = available_tool | |
| print(f"β Found exact match: {available_tool.name}") | |
| break | |
| # Then try suffix match (for tools like "test_mcp_server_sentiment_analysis") | |
| elif available_tool.name.endswith(f"_{tool_name}") or available_tool.name.endswith(tool_name): | |
| tool = available_tool | |
| print(f"β Found suffix match: {available_tool.name} matches {tool_name}") | |
| break | |
| if not tool: | |
| return { | |
| "error": f"Tool {tool_name} not available on server", | |
| "available_tools": available_tool_names, | |
| "requested_tool": tool_name, | |
| "agent_id": subtask.agent_id | |
| } | |
| # Special handling for web search with ticker detection | |
| if tool_name == "web_search" and subtask.agent_id == "web_agent": | |
| # Enhance query with ticker detection for financial queries | |
| original_query = subtask.parameters.get("query", "") | |
| enhanced_query = await self.task_decomposer.enhance_financial_query(original_query) | |
| subtask.parameters["query"] = enhanced_query | |
| print(f"π‘ Web search query enhanced: '{original_query}' β '{enhanced_query}'") | |
| # Map and filter parameters based on tool type | |
| filtered_params = self._filter_tool_parameters(tool_name, subtask.parameters) | |
| print(f"π§ Filtered parameters for {tool_name}: {filtered_params}") | |
| # Execute the tool | |
| try: | |
| result = tool(**filtered_params) | |
| # Handle the result - parse JSON string if needed | |
| if isinstance(result, str): | |
| try: | |
| parsed_result = json.loads(result) | |
| except json.JSONDecodeError: | |
| parsed_result = {"result": result} | |
| elif isinstance(result, dict): | |
| parsed_result = result | |
| elif hasattr(result, 'content'): | |
| # If it's a tool result object | |
| parsed_result = {"result": str(result.content)} | |
| else: | |
| parsed_result = {"result": str(result)} | |
| subtask.status = "completed" | |
| subtask.result = parsed_result | |
| return parsed_result | |
| except Exception as tool_error: | |
| return {"error": f"Tool execution error: {str(tool_error)}", "tool_name": tool_name} | |
| except Exception as e: | |
| subtask.status = "failed" | |
| return {"error": f"Subtask execution failed: {str(e)}", "subtask_id": subtask.id} | |
| def _filter_tool_parameters(self, tool_name: str, parameters: Dict[str, Any]) -> Dict[str, Any]: | |
| """Filter and map parameters based on tool requirements.""" | |
| # Parameter mappings for each tool | |
| tool_param_mappings = { | |
| "web_search": { # Updated to handle actual web_search tool | |
| "allowed_params": ["query", "max_results"], | |
| "param_mapping": { | |
| "search_query": "query", | |
| "search_term": "query", | |
| "q": "query", | |
| "data_type": None, # Remove this parameter | |
| "category": None # Remove this parameter | |
| } | |
| }, | |
| "sentiment_analysis": { | |
| "allowed_params": ["text"], | |
| "param_mapping": { | |
| "input_text": "text", | |
| "content": "text" | |
| } | |
| }, | |
| "place_search": { | |
| "allowed_params": ["query", "max_distance"], | |
| "param_mapping": { | |
| "location": "query", | |
| "search_query": "query", | |
| "distance": "max_distance" | |
| } | |
| }, | |
| "restaurant_search": { | |
| "allowed_params": ["query", "cuisine"], | |
| "param_mapping": { | |
| "location": "query", | |
| "search_query": "query", | |
| "cuisine_type": "cuisine" | |
| } | |
| }, | |
| "hiking_search": { | |
| "allowed_params": ["location", "difficulty", "max_distance"], | |
| "param_mapping": { | |
| "query": "location", | |
| "search_query": "location", | |
| "skill_level": "difficulty" | |
| } | |
| } | |
| } | |
| mapping_config = tool_param_mappings.get(tool_name, { | |
| "allowed_params": ["query"], | |
| "param_mapping": {} | |
| }) | |
| filtered_params = {} | |
| for param_key, param_value in parameters.items(): | |
| # Check if parameter should be mapped to a different name | |
| mapped_key = mapping_config["param_mapping"].get(param_key, param_key) | |
| # Skip parameters that are mapped to None (should be removed) | |
| if mapped_key is None: | |
| continue | |
| # Skip None or empty values | |
| if param_value is None or param_value == "": | |
| continue | |
| # Only include allowed parameters | |
| if mapped_key in mapping_config["allowed_params"]: | |
| filtered_params[mapped_key] = param_value | |
| # Ensure required parameters exist with defaults | |
| if tool_name in ["web_search"] and "query" not in filtered_params: | |
| # If no query parameter, use the first available parameter value | |
| if parameters: | |
| filtered_params["query"] = str(list(parameters.values())[0]) | |
| elif tool_name == "hiking_search" and "location" not in filtered_params: | |
| # For hiking, ensure location is provided | |
| if "query" in parameters and parameters["query"]: | |
| filtered_params["location"] = str(parameters["query"]) | |
| elif tool_name == "restaurant_search" and "query" not in filtered_params: | |
| # For restaurants, ensure query is provided | |
| if "location" in parameters and parameters["location"]: | |
| filtered_params["query"] = str(parameters["location"]) | |
| elif tool_name == "place_search" and "query" not in filtered_params: | |
| # For places, ensure query is provided | |
| if "location" in parameters and parameters["location"]: | |
| filtered_params["query"] = str(parameters["location"]) | |
| return filtered_params | |
| def test_tool_matching(self) -> str: | |
| """Test tool matching logic for debugging purposes.""" | |
| if not self.available_tools: | |
| return "β No tools available to test" | |
| results = [] | |
| results.append("π§ͺ Tool Matching Test Results:") | |
| results.append("") | |
| # Test each agent's tool against available tools | |
| for agent_id, agent in self.task_decomposer.agents.items(): | |
| tool_name = agent.tool_name | |
| results.append(f"π Testing agent '{agent_id}' looking for tool '{tool_name}':") | |
| # Test exact match | |
| exact_match = None | |
| suffix_match = None | |
| for available_tool in self.available_tools: | |
| if available_tool.name == tool_name: | |
| exact_match = available_tool.name | |
| break | |
| elif available_tool.name.endswith(f"_{tool_name}") or available_tool.name.endswith(tool_name): | |
| suffix_match = available_tool.name | |
| if exact_match: | |
| results.append(f" β Exact match found: {exact_match}") | |
| elif suffix_match: | |
| results.append(f" β Suffix match found: {suffix_match}") | |
| else: | |
| results.append(f" β No match found") | |
| results.append("") | |
| return "\n".join(results) | |
| async def process_query(self, user_query: str) -> Dict[str, Any]: | |
| """Process user query through LLM-powered task decomposition and agent routing.""" | |
| try: | |
| # Ensure connection | |
| if not self.is_connected: | |
| success = await self.connect_to_server() | |
| if not success: | |
| return { | |
| "query": user_query, | |
| "error": "Failed to connect to MCP server. Please ensure the server is running separately.", | |
| "status": "failed" | |
| } | |
| # Use LLM to decompose query into subtasks | |
| subtasks = await self.task_decomposer.decompose_query(user_query) | |
| # Execute subtasks | |
| results = [] | |
| for subtask in subtasks: | |
| result = await self.execute_subtask(subtask) | |
| results.append({ | |
| "subtask_id": subtask.id, | |
| "task_type": subtask.task_type.value, | |
| "agent": subtask.agent_id, | |
| "description": subtask.description, | |
| "confidence": subtask.confidence, | |
| "result": result | |
| }) | |
| # Aggregate results | |
| response = { | |
| "query": user_query, | |
| "subtasks_count": len(subtasks), | |
| "subtasks": results, | |
| "status": "completed", | |
| "summary": self._generate_summary(user_query, results) | |
| } | |
| return response | |
| except Exception as e: | |
| print(f"β Failed to process query: {e}") | |
| return { | |
| "query": user_query, | |
| "error": f"Query processing failed: {str(e)}", | |
| "status": "failed" | |
| } | |
| def _generate_summary(self, query: str, results: List[Dict[str, Any]]) -> str: | |
| """Generate summary of all subtask results - now simplified since server handles formatting.""" | |
| try: | |
| if not results: | |
| return f"# π€ No Results\n\nNo results available for your query. Please try a different search term.\n\n---\n*π§ Powered by AI Task Decomposition*" | |
| # Check if we have a pre-formatted summary from server | |
| for result in results: | |
| try: | |
| if isinstance(result, dict) and isinstance(result.get("result"), dict): | |
| # Check for direct summary first | |
| if result["result"].get("summary"): | |
| return result["result"]["summary"] | |
| # Check for formatted result content | |
| elif result["result"].get("result"): | |
| formatted_content = result["result"]["result"] | |
| if isinstance(formatted_content, str) and "π" in formatted_content: | |
| # This is pre-formatted content from server - return it directly | |
| return formatted_content | |
| except (KeyError, TypeError, AttributeError) as e: | |
| print(f"β οΈ Warning: Error accessing result summary: {e}") | |
| continue | |
| # Generate custom formatted summary | |
| summary_parts = [f"# π― Results for: *{query}*", ""] | |
| for i, result in enumerate(results, 1): | |
| try: | |
| task_type = result.get("task_type", "unknown") | |
| description = result.get("description", "No description") | |
| confidence = result.get("confidence", 0.5) | |
| result_data = result.get("result", {}) | |
| if isinstance(result_data, dict) and "error" not in result_data: | |
| summary_parts.append(f"## π Task {i}: {task_type.replace('_', ' ').title()}") | |
| summary_parts.append(f"**Confidence:** {confidence:.1%}") | |
| summary_parts.append("") | |
| # Extract and format the actual content | |
| if result_data.get("summary"): | |
| # Direct summary | |
| summary_parts.append(result_data["summary"]) | |
| elif result_data.get("result"): | |
| # Extract formatted content from nested result | |
| content = result_data["result"] | |
| if isinstance(content, str): | |
| # Clean up any escaped newlines and display formatted content | |
| formatted_content = content.replace('\\n', '\n').replace('\\t', '\t') | |
| summary_parts.append(formatted_content) | |
| else: | |
| # Handle other data types | |
| summary_parts.append(self._format_result_content(content)) | |
| else: | |
| # Fallback - format the entire result_data | |
| summary_parts.append(self._format_result_content(result_data)) | |
| summary_parts.append("") | |
| else: | |
| # Handle errors | |
| summary_parts.append(f"## β {task_type.replace('_', ' ').title()} - Error") | |
| if isinstance(result_data, dict): | |
| error_msg = result_data.get('error', 'Unknown error occurred') | |
| else: | |
| error_msg = str(result_data) | |
| summary_parts.append(f"**Issue:** {error_msg}") | |
| summary_parts.append("") | |
| except Exception as e: | |
| print(f"β οΈ Warning: Error processing result {i}: {e}") | |
| summary_parts.append(f"## β Task {i} - Processing Error") | |
| summary_parts.append(f"**Issue:** {str(e)}") | |
| summary_parts.append("") | |
| summary_parts.append("---") | |
| return "\n".join(summary_parts) | |
| except Exception as e: | |
| print(f"β Error in _generate_summary: {e}") | |
| return f"# β Summary Generation Error\n\nFailed to generate summary: {str(e)}\n\n---\n*π§ Powered by AI Task Decomposition*" | |
| async def _cleanup(self): | |
| """Clean up resources.""" | |
| if self.mcp_client: | |
| try: | |
| self.mcp_client.disconnect() | |
| except: | |
| pass | |
| self.mcp_client = None | |
| self.is_connected = False | |
| async def disconnect(self): | |
| """Disconnect from MCP server.""" | |
| await self._cleanup() | |
| print("π Disconnected from MCP server") | |
| def create_mcp_client_interface(server_url: str = "http://localhost:7861/gradio_api/mcp/sse", model_name: str = "anthropic"): | |
| """Create the Gradio interface for MCP Client with LLM-powered task decomposition.""" | |
| # Create the client manager | |
| async def process_query(query: str): | |
| """Process user query through MCP with LLM decomposition.""" | |
| if not query.strip(): | |
| return "Please enter a query to process." | |
| try: | |
| # Create a new client manager for each query to ensure fresh connection | |
| client_manager = MCPClientManager(server_url, model_name) | |
| # Process the query | |
| result = await client_manager.process_query(query) | |
| # Clean up | |
| await client_manager.disconnect() | |
| if result and 'summary' in result: | |
| return result['summary'] | |
| else: | |
| return "β No results found or error occurred during processing." | |
| except Exception as e: | |
| return f"β Error processing query: {str(e)}" | |
| # Check available models and API keys | |
| available_models = [] | |
| if ANTHROPIC_API_KEY: | |
| available_models.append(("π€ Claude Sonnet 4 via Anthropic", "anthropic")) | |
| if not available_models: | |
| available_models.append(("β No API Keys Configured", "none")) | |
| # Custom CSS for better UI | |
| css = """ | |
| .gradio-container { | |
| max-width: 1200px !important; | |
| margin: auto !important; | |
| } | |
| .header-container { | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
| padding: 2rem; | |
| border-radius: 15px; | |
| margin-bottom: 2rem; | |
| color: white; | |
| text-align: center; | |
| } | |
| .model-info { | |
| background: #f8fafc; | |
| border: 1px solid #e2e8f0; | |
| border-radius: 10px; | |
| padding: 1rem; | |
| margin: 1rem 0; | |
| } | |
| .example-btn { | |
| margin: 0.25rem !important; | |
| background: linear-gradient(45deg, #4f46e5, #7c3aed) !important; | |
| border: none !important; | |
| color: white !important; | |
| } | |
| .example-btn:hover { | |
| transform: translateY(-2px); | |
| box-shadow: 0 4px 12px rgba(79, 70, 229, 0.4) !important; | |
| } | |
| .input-section { | |
| background: #ffffff; | |
| border: 1px solid #e5e7eb; | |
| border-radius: 12px; | |
| padding: 1.5rem; | |
| margin: 1rem 0; | |
| box-shadow: 0 2px 8px rgba(0,0,0,0.05); | |
| } | |
| .results-container { | |
| background: #ffffff; | |
| border: 1px solid #e5e7eb; | |
| border-radius: 12px; | |
| padding: 1.5rem; | |
| margin: 1rem 0; | |
| box-shadow: 0 2px 8px rgba(0,0,0,0.05); | |
| } | |
| .control-buttons { | |
| margin-top: 1rem; | |
| gap: 1rem; | |
| } | |
| .markdown-content { | |
| line-height: 1.6; | |
| } | |
| """ | |
| with gr.Blocks(css=css, title="π VOYAGER AI") as demo: | |
| # Header Section | |
| with gr.Column(elem_classes="header-container"): | |
| gr.HTML(""" | |
| <h1 style="margin: 0; font-size: 2.5rem; font-weight: bold;"> | |
| π VOYAGER AI | |
| </h1> | |
| <p style="margin: 0.5rem 0 0 0; font-size: 1.2rem; opacity: 0.9;"> | |
| Intelligent AI Assistant with Multi-Agent Coordination | |
| </p> | |
| """) | |
| # Main Interface | |
| with gr.Column(): | |
| # Examples Section | |
| with gr.Column(): | |
| gr.HTML(""" | |
| <h3 style="color: #1f2937; margin: 20px 0 15px 0; font-size: 18px; font-weight: 600;"> | |
| π‘ Quick Start - Try These Examples: | |
| </h3> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| sentiment_btn = gr.Button("π Analyze Sentiment", elem_classes=["example-btn"]) | |
| hiking_btn = gr.Button("ποΈ Hiking trails near Denver", elem_classes=["example-btn"]) | |
| with gr.Column(scale=1): | |
| stock_btn = gr.Button("π Stock Prices", elem_classes=["example-btn"]) | |
| news_btn = gr.Button("π° Latest News", elem_classes=["example-btn"]) | |
| with gr.Column(scale=1): | |
| hotel_btn = gr.Button("π¨ Find Hotels", elem_classes=["example-btn"]) | |
| restaurant_btn = gr.Button("π½οΈ Find Restaurants", elem_classes=["example-btn"]) | |
| # Input Section | |
| with gr.Column(elem_classes="input-section"): | |
| # Query Input | |
| query_input = gr.Textbox( | |
| placeholder="π¬ Ask me anything... (e.g., 'What's NVIDIA's stock price?' or 'Find hotels and restaurants in New york')", | |
| lines=3, | |
| label="Your Query", | |
| show_label=False, | |
| container=False | |
| ) | |
| # Control Buttons | |
| with gr.Row(elem_classes="control-buttons"): | |
| submit_btn = gr.Button( | |
| "π§ Analyze & Execute Query", | |
| variant="primary", | |
| size="lg", | |
| scale=2 | |
| ) | |
| clear_btn = gr.Button( | |
| "π Clear All", | |
| variant="secondary", | |
| size="lg", | |
| scale=1 | |
| ) | |
| # Results Display | |
| with gr.Column(elem_classes="results-container"): | |
| gr.HTML(""" | |
| <h3 style="color: #1f2937; margin: 0 0 20px 0; font-size: 18px; font-weight: 600;"> | |
| π― AI Results & Analysis | |
| </h3> | |
| """) | |
| results_output = gr.Markdown( | |
| value=f"""**Welcome to VOYAGER AI!** π§ | |
| **π― How it works:** | |
| 1. **Try one of the examples above** or type your question naturally | |
| 2. **Click "Analyze & Execute"** to get intelligent results | |
| **β¨ Features:** | |
| β’ π **Smart Query Analysis** - AI understands your intent | |
| β’ π **Task Decomposition** - Complex queries broken down into subtasks | |
| β’ π€ **Agent Routing** - Specialized agents for different tasks | |
| β’ β‘ **Real-time Data** - Live web search and current information | |
| β’ π¨ **Professional Results** - Clean, formatted responses | |
| **π Ready to start?** Try one of the example buttons above or type your own query! | |
| --- | |
| π€ **Current Model:** {available_models[0][0] if available_models[0][1] != "none" else "No API Keys Configured"}""", | |
| show_label=False, | |
| container=False, | |
| elem_classes=["markdown-content"] | |
| ) | |
| # Event handlers | |
| submit_btn.click( | |
| process_query, | |
| inputs=[query_input], | |
| outputs=[results_output] | |
| ) | |
| clear_btn.click( | |
| fn=lambda: ("", """**Interface Cleared!** π§Ή | |
| Ready for your next query. Try the example buttons above or ask me anything! | |
| π‘ **Quick Tips:** | |
| - Try asking about stock prices, weather, news, or travel | |
| - Use natural language - no need for specific commands | |
| - Complex queries are automatically broken down into tasks"""), | |
| outputs=[query_input, results_output] | |
| ) | |
| query_input.submit( | |
| process_query, | |
| inputs=[query_input], | |
| outputs=[results_output] | |
| ) | |
| # Example button handlers with better queries | |
| sentiment_btn.click( | |
| fn=lambda: "Analyze sentiment: 'I absolutely love this new AI technology - it's revolutionary and amazing!'", | |
| outputs=query_input | |
| ) | |
| hiking_btn.click( | |
| fn=lambda: "Find moderate hiking trails near Denver", | |
| outputs=query_input | |
| ) | |
| stock_btn.click( | |
| fn=lambda: "What's the current stock price of SPY?", | |
| outputs=query_input | |
| ) | |
| news_btn.click( | |
| fn=lambda: "Latest news about artificial intelligence and technology", | |
| outputs=query_input | |
| ) | |
| hotel_btn.click( | |
| fn=lambda: "Find luxury hotels in New york", | |
| outputs=query_input | |
| ) | |
| restaurant_btn.click( | |
| fn=lambda: "Best Italian restaurants in New York", | |
| outputs=query_input | |
| ) | |
| return demo | |
| async def main(): | |
| """Main entry point for the LLM-Powered MCP Client.""" | |
| # Set up environment first | |
| setup_environment() | |
| parser = argparse.ArgumentParser(description="LLM-Powered MCP Client with Intelligent Task Decomposition") | |
| parser.add_argument( | |
| "--server-url", | |
| default="https://srikanthnagelli-agents-mcp-hackathon.hf.space/gradio_api/mcp/sse", | |
| help="MCP server URL (default: https://srikanthnagelli-agents-mcp-hackathon.hf.space/gradio_api/mcp/sse)" | |
| ) | |
| parser.add_argument( | |
| "--local", | |
| action="store_true", | |
| help="Use local MCP server (http://localhost:7860/gradio_api/mcp/sse) instead of remote server" | |
| ) | |
| parser.add_argument( | |
| "--port", | |
| type=int, | |
| default=7862, | |
| help="Port to run the client interface (default: 7862)" | |
| ) | |
| parser.add_argument( | |
| "--model", | |
| default="anthropic", | |
| choices=["anthropic"], | |
| help="LLM model for task decomposition (default: anthropic)" | |
| ) | |
| args = parser.parse_args() | |
| # Override server URL if --local flag is used | |
| if args.local: | |
| server_url = "http://localhost:7860/gradio_api/mcp/sse" | |
| print("π Local development mode enabled") | |
| else: | |
| server_url = args.server_url | |
| print("βοΈ Using remote MCP server") | |
| print("π VOYAGER AI - LLM-Powered Task Decomposition") | |
| print("π§ Intelligent query analysis and agent coordination") | |
| print("π€ Available Models:") | |
| if ANTHROPIC_API_KEY: | |
| print(" β’ β Claude Sonnet 4 via Anthropic") | |
| else: | |
| print(" β’ β Claude Sonnet 4 (API key missing)") | |
| print("") | |
| print(f"π‘ MCP Server: {server_url}") | |
| print(f"π§ Default Model: {args.model}") | |
| print("β" * 50) | |
| # Validate selected model | |
| if args.model == "anthropic" and not ANTHROPIC_API_KEY: | |
| print("β οΈ Warning: Anthropic model selected but API key not configured") | |
| # Create and launch LLM-powered interface | |
| demo = create_mcp_client_interface(server_url, args.model) | |
| print("π Interface ready! Select your model and ask anything naturally!") | |
| # Launch the interface - different configs for local vs deployment | |
| if args.local: | |
| # Local development - with share link | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=args.port, | |
| share=True, | |
| show_error=True | |
| ) | |
| else: | |
| # Production deployment (e.g., Hugging Face Spaces) | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| show_error=True | |
| ) | |
| if __name__ == "__main__": | |
| try: | |
| asyncio.run(main()) | |
| except KeyboardInterrupt: | |
| print("π Client shutdown requested") | |
| except Exception as e: | |
| print(f"β Client error: {e}") | |
| sys.exit(1) |