| | |
| | """ |
| | Enhanced MCP Client with LLM-based task decomposition, intelligent agent routing, and real MCP protocol. |
| | This client uses AI for smart query analysis and agent coordination instead of hard-coded rules. |
| | """ |
| | import gradio as gr |
| | import asyncio |
| | import json |
| | import os |
| | import sys |
| | import argparse |
| | from typing import Dict, List, Any, Optional |
| | from dataclasses import dataclass |
| | from enum import Enum |
| | import uuid |
| | from pathlib import Path |
| | import requests |
| | from smolagents import MCPClient, LiteLLMModel |
| |
|
| | ANTHROPIC_API_KEY = os.environ.get('ANTHROPIC_API_KEY') |
| |
|
| | def setup_environment(): |
| | """Set up environment variables and configuration.""" |
| | global ANTHROPIC_API_KEY |
| | |
| | |
| | print("\nπ API Configuration:") |
| | print(f"Anthropic API Key: {'β Configured' if ANTHROPIC_API_KEY else 'β Missing'}") |
| | |
| | if not ANTHROPIC_API_KEY: |
| | print("β οΈ Warning: ANTHROPIC_API_KEY not found in environment") |
| | print("π‘ Set environment variable: ANTHROPIC_API_KEY=your_anthropic_key") |
| |
|
| | class TaskType(Enum): |
| | """Types of tasks that can be decomposed.""" |
| | SENTIMENT_ANALYSIS = "sentiment_analysis" |
| | LOCATION_SEARCH = "place_search" |
| | RESTAURANT_SEARCH = "restaurant_search" |
| | HIKING_SEARCH = "hiking_search" |
| | WEB_SEARCH = "web_search" |
| | COMPLEX_QUERY = "complex_query" |
| |
|
| | @dataclass |
| | class SubTask: |
| | """Represents a sub-atomic task.""" |
| | id: str |
| | task_type: TaskType |
| | description: str |
| | parameters: Dict[str, Any] |
| | agent_id: str |
| | confidence: float = 0.5 |
| | status: str = "pending" |
| | result: Optional[Dict[str, Any]] = None |
| |
|
| | @dataclass |
| | class Agent: |
| | """Represents a dedicated agent for handling specific tools.""" |
| | id: str |
| | name: str |
| | tool_name: str |
| | description: str |
| | capabilities: List[str] |
| | keywords: List[str] |
| |
|
| | class LLMTaskDecomposer: |
| | """LLM-powered task decomposer using system prompts for intelligent query analysis.""" |
| | |
| | def __init__(self, model_name: str = "anthropic"): |
| | """Initialize with support for multiple LLM providers.""" |
| | self.model_name = model_name.lower() |
| | self.model = None |
| | |
| | |
| | self._initialize_model() |
| | self.agents = self._initialize_agents() |
| | |
| | def _initialize_model(self): |
| | """Initialize the selected model with proper error handling.""" |
| | try: |
| | if self.model_name == "anthropic": |
| | if not ANTHROPIC_API_KEY: |
| | print("β ANTHROPIC_API_KEY environment variable is required for Anthropic model") |
| | print("π‘ Model will fall back to keyword-based decomposition") |
| | self.model = None |
| | return |
| | |
| | print(f"π§ Initializing Anthropic model...") |
| | self.model = LiteLLMModel( |
| | model_id="anthropic/claude-sonnet-4-20250514", |
| | temperature=0.2, |
| | api_key=ANTHROPIC_API_KEY |
| | ) |
| | |
| | |
| | try: |
| | test_response = self.model([{"role": "user", "content": "Hello"}]) |
| | print(f"β
Anthropic model initialized and tested successfully") |
| | print(f"π§ Model response test: {str(test_response)[:50]}...") |
| | except Exception as test_error: |
| | print(f"β οΈ Model initialized but test call failed: {test_error}") |
| | print(f"π Will attempt to use model anyway, with fallback to keywords") |
| | else: |
| | print(f"β Unknown model name: {self.model_name}") |
| | self.model = None |
| | |
| | except Exception as e: |
| | print(f"β Model initialization failed: {e}") |
| | print(f"π Falling back to keyword-based decomposition") |
| | self.model = None |
| | |
| | def get_model_info(self) -> Dict[str, str]: |
| | """Get information about the current model.""" |
| | if self.model_name == "anthropic": |
| | return { |
| | "name": "Claude Sonnet 4", |
| | "provider": "Anthropic", |
| | "emoji": "π€", |
| | "model_id": "anthropic/claude-sonnet-4-20250514", |
| | "status": "initialized" if self.model else "failed" |
| | } |
| | else: |
| | return { |
| | "name": "Unknown Model", |
| | "provider": "Unknown", |
| | "emoji": "β", |
| | "model_id": "unknown", |
| | "status": "failed" |
| | } |
| | |
| | def _initialize_agents(self) -> Dict[str, Agent]: |
| | """Initialize specialized agents with their capabilities and keywords.""" |
| | agents = { |
| | "sentiment_agent": Agent( |
| | id="sentiment_agent", |
| | name="Sentiment Analysis Agent", |
| | tool_name="sentiment_analysis", |
| | description="Analyzes text sentiment, emotions, and opinions", |
| | capabilities=["text_analysis", "emotion_detection", "polarity_scoring", "opinion_mining"], |
| | keywords=["sentiment", "feeling", "opinion", "review", "emotion", "mood", "analyze text", "positive", "negative", "happy", "sad", "angry", "excited"] |
| | ), |
| | "location_agent": Agent( |
| | id="location_agent", |
| | name="Location Search Agent", |
| | tool_name="place_search", |
| | description="Finds hotels, accommodations, and places to stay", |
| | capabilities=["place_search", "hotel_finder", "accommodation_search", "lodging_recommendations"], |
| | keywords=["hotel", "hotels", "stay", "accommodation", "lodging", "motel", "resort", "inn", "bed and breakfast", "airbnb", "place to stay"] |
| | ), |
| | "restaurant_agent": Agent( |
| | id="restaurant_agent", |
| | name="Restaurant Search Agent", |
| | tool_name="restaurant_search", |
| | description="Discovers restaurants, food places, and dining options", |
| | capabilities=["restaurant_search", "cuisine_finder", "dining_recommendations", "food_discovery"], |
| | keywords=["restaurant", "restaurants", "food", "dining", "eat", "dinner", "lunch", "breakfast", "cafe", "bar", "cuisine", "meal", "dining out"] |
| | ), |
| | "hiking_agent": Agent( |
| | id="hiking_agent", |
| | name="Hiking Search Agent", |
| | tool_name="hiking_search", |
| | description="Finds hiking trails, outdoor activities, and nature spots", |
| | capabilities=["trail_finder", "outdoor_activities", "difficulty_assessment", "nature_exploration"], |
| | keywords=["hike", "hiking", "trail", "trails", "trek", "trekking", "outdoor", "mountain", "nature", "walk", "walking", "climbing", "adventure"] |
| | ), |
| | "web_agent": Agent( |
| | id="web_agent", |
| | name="Web Search Agent", |
| | tool_name="web_search", |
| | description="Searches web for information, news, weather, finance, and general queries with intelligent ticker detection for financial data", |
| | capabilities=["web_search", "information_retrieval", "real_time_data", "news_search", "weather_data", "financial_data", "ticker_detection"], |
| | keywords=["search", "find", "lookup", "google", "web", "information", "weather", "news", "current", "latest", "what is", "definition", "stock", "price", "market", "finance"] |
| | ) |
| | } |
| | return agents |
| | |
| | async def decompose_query(self, user_query: str) -> List[SubTask]: |
| | """ |
| | Use LLM to analyze user query and decompose into actionable subtasks. |
| | """ |
| | print(f"π Decomposing query: '{user_query}'") |
| | |
| | try: |
| | |
| | system_prompt = self._create_decomposition_prompt() |
| | |
| | |
| | user_message = f"""Query to analyze: "{user_query}" |
| | |
| | Please analyze this query and respond with a JSON object containing your analysis.""" |
| | |
| | print(f"π§ Attempting LLM decomposition with model: {self.model_name}") |
| | |
| | |
| | if self.model_name == "anthropic" and self.model is not None: |
| | |
| | try: |
| | print(f"π‘ Calling LLM model...") |
| | response = self.model([ |
| | {"role": "system", "content": system_prompt}, |
| | {"role": "user", "content": user_message} |
| | ]) |
| | print(f"β
LLM response received: {str(response)[:200]}...") |
| | print(f"π Response type: {type(response)}") |
| | except Exception as e: |
| | print(f"β Model call failed: {e}") |
| | print(f"π Falling back to keyword-based decomposition") |
| | return self._fallback_decomposition(user_query) |
| | else: |
| | print(f"β Model not available ({self.model_name}), using fallback") |
| | return self._fallback_decomposition(user_query) |
| | |
| | |
| | print(f"π Parsing LLM response...") |
| | analysis = self._parse_llm_response(response, user_query) |
| | print(f"π Analysis result: {analysis}") |
| | |
| | |
| | print(f"π― Creating subtasks from analysis...") |
| | subtasks = self._create_subtasks(analysis, user_query) |
| | print(f"β
Generated {len(subtasks)} subtasks") |
| | |
| | if not subtasks: |
| | print("β οΈ No subtasks generated, using fallback") |
| | return self._fallback_decomposition(user_query) |
| | |
| | |
| | for i, subtask in enumerate(subtasks): |
| | print(f" π Subtask {i+1}: {subtask.agent_id} -> {subtask.description}") |
| | |
| | return subtasks |
| | |
| | except Exception as e: |
| | print(f"β Task decomposition failed: {e}") |
| | print(f"π Using fallback decomposition") |
| | return self._fallback_decomposition(user_query) |
| | |
| | def _create_decomposition_prompt(self) -> str: |
| | """Create comprehensive system prompt for task decomposition.""" |
| | agent_descriptions = [] |
| | for agent_id, agent in self.agents.items(): |
| | agent_descriptions.append(f""" |
| | **{agent.name}** ({agent_id}): |
| | - Description: {agent.description} |
| | - Tool: {agent.tool_name} |
| | - Keywords: {', '.join(agent.keywords[:10])} |
| | - Capabilities: {', '.join(agent.capabilities)} |
| | """) |
| | |
| | return f"""You are an intelligent task decomposer for a multi-agent system. Your job is to analyze user queries and route them to the most appropriate specialized agents. |
| | |
| | AVAILABLE AGENTS: |
| | {chr(10).join(agent_descriptions)} |
| | |
| | TASK DECOMPOSITION RULES: |
| | 1. **Analyze Intent**: Identify the primary purpose of the user's query |
| | 2. **Extract Entities**: Find locations, keywords, parameters, and specific requirements |
| | 3. **Route Intelligently**: Choose the most appropriate agent(s) based on intent and entities |
| | 4. **Handle Complex Queries**: Break down multi-intent queries into separate tasks |
| | 5. **Provide Fallbacks**: Use web_agent for ambiguous or unsupported queries |
| | |
| | RESPONSE FORMAT: |
| | Always respond with valid JSON in this exact format: |
| | {{ |
| | "analysis": {{ |
| | "query_type": "simple|complex|ambiguous", |
| | "primary_intent": "brief description of main intent", |
| | "complexity_score": 0.0-1.0, |
| | "location_extracted": "location if found or null", |
| | "entities": ["entity1", "entity2"], |
| | "reasoning": "brief explanation of your analysis" |
| | }}, |
| | "tasks": [ |
| | {{ |
| | "task_id": "unique_id", |
| | "agent_id": "agent_name", |
| | "description": "clear task description", |
| | "parameters": {{"param1": "value1"}}, |
| | "confidence": 0.0-1.0, |
| | "priority": 1-5 |
| | }} |
| | ] |
| | }} |
| | |
| | TOOL PARAMETER SPECIFICATIONS: |
| | - **web_search**: {{"query": "search_terms", "max_results": 5}} |
| | - **sentiment_analysis**: {{"text": "text_to_analyze"}} |
| | - **place_search**: {{"query": "location", "max_distance": 20}} |
| | - **restaurant_search**: {{"query": "location", "cuisine": "cuisine_type_or_null"}} |
| | - **hiking_search**: {{"location": "location", "difficulty": "easy|moderate|hard|null", "max_distance": 30}} |
| | |
| | COMPREHENSIVE EXAMPLES: |
| | |
| | **Financial/Stock Queries (Enhanced with Ticker Detection):** |
| | Query: "What's NVIDIA's current stock price?" |
| | {{ |
| | "analysis": {{"query_type": "simple", "primary_intent": "get financial data", "complexity_score": 0.3, "location_extracted": null, "entities": ["NVIDIA", "stock price"], "reasoning": "Financial query for real-time stock data - ticker detection will enhance this"}}, |
| | "tasks": [{{"task_id": "web_001", "agent_id": "web_agent", "description": "Get current NVIDIA stock price with intelligent ticker detection", "parameters": {{"query": "NVIDIA stock price", "max_results": 5}}, "confidence": 0.95, "priority": 1}}] |
| | }} |
| | |
| | **General Web Search:** |
| | Query: "Latest news about AI technology" |
| | {{ |
| | "analysis": {{"query_type": "simple", "primary_intent": "search for news", "complexity_score": 0.3, "location_extracted": null, "entities": ["news", "AI", "technology"], "reasoning": "General web search for current information"}}, |
| | "tasks": [{{"task_id": "web_002", "agent_id": "web_agent", "description": "Search for latest AI technology news", "parameters": {{"query": "latest AI technology news", "max_results": 5}}, "confidence": 0.9, "priority": 1}}] |
| | }} |
| | |
| | **Hiking/Outdoor Queries:** |
| | Query: "Find moderate hiking trails near Seattle within 30 miles" |
| | {{ |
| | "analysis": {{"query_type": "simple", "primary_intent": "find hiking trails", "complexity_score": 0.4, "location_extracted": "Seattle", "entities": ["hiking", "trails", "moderate", "Seattle", "30 miles"], "reasoning": "Outdoor activity search with specific location and difficulty"}}, |
| | "tasks": [{{"task_id": "hiking_001", "agent_id": "hiking_agent", "description": "Find moderate hiking trails near Seattle", "parameters": {{"location": "Seattle", "difficulty": "moderate", "max_distance": 30}}, "confidence": 0.95, "priority": 1}}] |
| | }} |
| | |
| | **Hotel/Accommodation Queries:** |
| | Query: "Best luxury hotels in Paris near Eiffel Tower" |
| | {{ |
| | "analysis": {{"query_type": "simple", "primary_intent": "find accommodation", "complexity_score": 0.4, "location_extracted": "Paris", "entities": ["hotels", "luxury", "Paris", "Eiffel Tower"], "reasoning": "Accommodation search with location and luxury preference"}}, |
| | "tasks": [{{"task_id": "place_001", "agent_id": "location_agent", "description": "Find luxury hotels in Paris near Eiffel Tower", "parameters": {{"query": "luxury hotels Paris near Eiffel Tower", "max_distance": 20}}, "confidence": 0.9, "priority": 1}}] |
| | }} |
| | |
| | **Restaurant/Food Queries:** |
| | Query: "Italian restaurants in New York with outdoor seating" |
| | {{ |
| | "analysis": {{"query_type": "simple", "primary_intent": "find restaurants", "complexity_score": 0.4, "location_extracted": "New York", "entities": ["Italian", "restaurants", "New York", "outdoor seating"], "reasoning": "Restaurant search with cuisine and location preferences"}}, |
| | "tasks": [{{"task_id": "rest_001", "agent_id": "restaurant_agent", "description": "Find Italian restaurants in New York with outdoor seating", "parameters": {{"query": "New York", "cuisine": "Italian"}}, "confidence": 0.9, "priority": 1}}] |
| | }} |
| | |
| | **Sentiment Analysis Queries:** |
| | Query: "Analyze sentiment: 'This product is amazing and exceeded my expectations!'" |
| | {{ |
| | "analysis": {{"query_type": "simple", "primary_intent": "analyze text sentiment", "complexity_score": 0.2, "location_extracted": null, "entities": ["sentiment", "text analysis"], "reasoning": "Clear sentiment analysis request with provided text"}}, |
| | "tasks": [{{"task_id": "sent_001", "agent_id": "sentiment_agent", "description": "Analyze sentiment of product review", "parameters": {{"text": "This product is amazing and exceeded my expectations!"}}, "confidence": 0.95, "priority": 1}}] |
| | }} |
| | |
| | **Complex Multi-Intent Queries:** |
| | Query: "I'm planning a trip to Tokyo - need hotels and restaurants" |
| | {{ |
| | "analysis": {{"query_type": "complex", "primary_intent": "travel planning with accommodation and dining", "complexity_score": 0.7, "location_extracted": "Tokyo", "entities": ["trip", "Tokyo", "hotels", "restaurants"], "reasoning": "Multi-intent travel query requiring both accommodation and restaurant search"}}, |
| | "tasks": [ |
| | {{"task_id": "place_001", "agent_id": "location_agent", "description": "Find hotels in Tokyo", "parameters": {{"query": "Tokyo", "max_distance": 20}}, "confidence": 0.9, "priority": 1}}, |
| | {{"task_id": "rest_001", "agent_id": "restaurant_agent", "description": "Find restaurants in Tokyo", "parameters": {{"query": "Tokyo", "cuisine": null}}, "confidence": 0.9, "priority": 1}} |
| | ] |
| | }} |
| | |
| | **Weather/News/General Web Queries:** |
| | Query: "Latest news about artificial intelligence developments" |
| | {{ |
| | "analysis": {{"query_type": "simple", "primary_intent": "get current news information", "complexity_score": 0.3, "location_extracted": null, "entities": ["news", "artificial intelligence"], "reasoning": "Information retrieval query requiring web search"}}, |
| | "tasks": [{{"task_id": "web_001", "agent_id": "web_agent", "description": "Get latest AI news", "parameters": {{"query": "latest news artificial intelligence developments"}}, "confidence": 0.9, "priority": 1}}] |
| | }} |
| | |
| | **Ambiguous Queries:** |
| | Query: "Tell me about Paris" |
| | {{ |
| | "analysis": {{"query_type": "ambiguous", "primary_intent": "get general information", "complexity_score": 0.5, "location_extracted": "Paris", "entities": ["Paris"], "reasoning": "Vague query - could be travel, history, or general info - use web search"}}, |
| | "tasks": [{{"task_id": "web_001", "agent_id": "web_agent", "description": "Get general information about Paris", "parameters": {{"query": "Paris information travel guide"}}, "confidence": 0.7, "priority": 1}}] |
| | }} |
| | |
| | INTELLIGENT ROUTING GUIDELINES: |
| | - **Keywords for hiking_agent**: hiking, trails, trek, outdoor, mountain, nature, walk, climbing, adventure |
| | - **Keywords for location_agent**: hotel, hotels, accommodation, stay, lodging, motel, resort, inn |
| | - **Keywords for restaurant_agent**: restaurant, food, dining, eat, cuisine, meal, cafe, bar |
| | - **Keywords for sentiment_agent**: sentiment, analyze, opinion, feeling, emotion, review, mood |
| | - **Keywords for web_agent**: news, weather, stock, price, latest, current, information, what is |
| | |
| | PARAMETER EXTRACTION RULES: |
| | - **Locations**: Look for city names, landmarks, "in", "near", "at", "around" |
| | - **Difficulties**: easy, moderate, hard, difficult, challenging, extreme |
| | - **Distances**: "within X miles", "X km radius", "close to" |
| | - **Cuisines**: Italian, Chinese, Mexican, etc. |
| | - **Accommodations**: luxury, budget, boutique, business, etc. |
| | |
| | IMPORTANT: |
| | - Always provide valid JSON |
| | - Use exact agent_id values from the list above |
| | - Extract locations and parameters accurately |
| | - Assign appropriate confidence scores based on query clarity |
| | - For unclear queries, use web_agent as fallback |
| | - Be specific in task descriptions and reasoning""" |
| | |
| | def _parse_llm_response(self, response, original_query: str) -> Dict[str, Any]: |
| | """Parse LLM response and extract structured analysis.""" |
| | try: |
| | |
| | if hasattr(response, 'content'): |
| | |
| | response_text = response.content |
| | elif hasattr(response, 'text'): |
| | |
| | response_text = response.text |
| | elif isinstance(response, str): |
| | |
| | response_text = response |
| | else: |
| | |
| | response_text = str(response) |
| | |
| | print(f"π Raw response text: {response_text[:500]}...") |
| | |
| | |
| | import re |
| | |
| | |
| | response_text = re.sub(r'```json\s*', '', response_text) |
| | response_text = re.sub(r'```\s*$', '', response_text) |
| | response_text = response_text.strip() |
| | |
| | |
| | json_patterns = [ |
| | r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', |
| | r'\{.*\}', |
| | ] |
| | |
| | analysis = None |
| | |
| | |
| | if response_text.strip().startswith('{') and response_text.strip().endswith('}'): |
| | try: |
| | analysis = json.loads(response_text.strip()) |
| | print(f"β
Successfully parsed JSON via direct parsing") |
| | except json.JSONDecodeError: |
| | print(f"β οΈ Direct JSON parsing failed, trying pattern matching") |
| | |
| | |
| | if not analysis: |
| | for pattern in json_patterns: |
| | json_match = re.search(pattern, response_text, re.DOTALL) |
| | if json_match: |
| | try: |
| | json_text = json_match.group().strip() |
| | print(f"π Extracted JSON: {json_text[:200]}...") |
| | analysis = json.loads(json_text) |
| | print(f"β
Successfully parsed JSON analysis via pattern matching") |
| | break |
| | except json.JSONDecodeError as json_error: |
| | print(f"β JSON decode error with pattern {pattern}: {json_error}") |
| | continue |
| | |
| | |
| | if not analysis: |
| | try: |
| | analysis = self._extract_json_with_balanced_braces(response_text) |
| | if analysis: |
| | print(f"β
Successfully parsed JSON via balanced brace extraction") |
| | except Exception as brace_error: |
| | print(f"β Balanced brace extraction failed: {brace_error}") |
| | |
| | if analysis: |
| | return analysis |
| | else: |
| | raise ValueError("No valid JSON found in response") |
| | |
| | except Exception as e: |
| | print(f"β Error parsing LLM response: {e}") |
| | print(f"π Falling back to keyword-based analysis") |
| | |
| | return { |
| | "analysis": { |
| | "query_type": "simple", |
| | "primary_intent": "general query", |
| | "complexity_score": 0.5, |
| | "location_extracted": None, |
| | "entities": [], |
| | "reasoning": f"LLM parsing failed: {str(e)}" |
| | }, |
| | "tasks": [ |
| | { |
| | "task_id": "web_fallback", |
| | "agent_id": "web_agent", |
| | "description": original_query, |
| | "parameters": {"query": original_query, "category": None}, |
| | "confidence": 0.5, |
| | "priority": 1 |
| | } |
| | ] |
| | } |
| | |
| | def _extract_json_with_balanced_braces(self, text: str) -> Optional[Dict[str, Any]]: |
| | """Extract JSON by finding balanced braces manually.""" |
| | import json |
| | |
| | |
| | start_idx = text.find('{') |
| | if start_idx == -1: |
| | return None |
| | |
| | |
| | brace_count = 0 |
| | end_idx = start_idx |
| | in_string = False |
| | escape_next = False |
| | |
| | for i, char in enumerate(text[start_idx:], start_idx): |
| | if escape_next: |
| | escape_next = False |
| | continue |
| | |
| | if char == '\\': |
| | escape_next = True |
| | continue |
| | |
| | if char == '"' and not escape_next: |
| | in_string = not in_string |
| | continue |
| | |
| | if not in_string: |
| | if char == '{': |
| | brace_count += 1 |
| | elif char == '}': |
| | brace_count -= 1 |
| | if brace_count == 0: |
| | end_idx = i |
| | break |
| | |
| | if brace_count == 0: |
| | json_text = text[start_idx:end_idx + 1] |
| | try: |
| | return json.loads(json_text) |
| | except json.JSONDecodeError: |
| | return None |
| | |
| | return None |
| | |
| | def _create_subtasks(self, analysis: Dict[str, Any], original_query: str) -> List[SubTask]: |
| | """Convert LLM analysis into SubTask objects.""" |
| | subtasks = [] |
| | |
| | tasks = analysis.get("tasks", []) |
| | if not tasks: |
| | |
| | tasks = [{ |
| | "task_id": "fallback_001", |
| | "agent_id": "web_agent", |
| | "description": original_query, |
| | "parameters": {"query": original_query}, |
| | "confidence": 0.5, |
| | "priority": 1 |
| | }] |
| | |
| | for task_data in tasks: |
| | agent_id = task_data.get("agent_id", "web_agent") |
| | |
| | |
| | task_type_mapping = { |
| | "sentiment_agent": TaskType.SENTIMENT_ANALYSIS, |
| | "location_agent": TaskType.LOCATION_SEARCH, |
| | "restaurant_agent": TaskType.RESTAURANT_SEARCH, |
| | "hiking_agent": TaskType.HIKING_SEARCH, |
| | "web_agent": TaskType.WEB_SEARCH |
| | } |
| | |
| | task_type = task_type_mapping.get(agent_id, TaskType.WEB_SEARCH) |
| | |
| | subtask = SubTask( |
| | id=task_data.get("task_id", str(uuid.uuid4())), |
| | task_type=task_type, |
| | description=task_data.get("description", original_query), |
| | parameters=task_data.get("parameters", {"query": original_query}), |
| | agent_id=agent_id, |
| | confidence=task_data.get("confidence", 0.5) |
| | ) |
| | |
| | subtasks.append(subtask) |
| | |
| | return subtasks |
| | |
| | def _fallback_decomposition(self, user_query: str) -> List[SubTask]: |
| | """Fallback decomposition using simple keyword matching.""" |
| | print(f"π Using fallback decomposition for: '{user_query}'") |
| | query_lower = user_query.lower() |
| | |
| | |
| | if any(word in query_lower for word in ["sentiment", "feeling", "opinion", "emotion", "analyze"]): |
| | print(f"π Detected sentiment analysis request") |
| | |
| | |
| | import re |
| | text_to_analyze = user_query |
| | |
| | |
| | quote_pattern = r"['\"]([^'\"]+)['\"]" |
| | quote_match = re.search(quote_pattern, user_query) |
| | if quote_match: |
| | text_to_analyze = quote_match.group(1) |
| | print(f"π Extracted quoted text: '{text_to_analyze}'") |
| | |
| | |
| | elif "analyze sentiment:" in query_lower: |
| | parts = user_query.split(":", 1) |
| | if len(parts) > 1: |
| | text_to_analyze = parts[1].strip().strip("'\"") |
| | print(f"π Extracted text after colon: '{text_to_analyze}'") |
| | |
| | |
| | elif "sentiment" in query_lower: |
| | |
| | sentiment_patterns = [ |
| | r"sentiment[:\s]+['\"]?([^'\"]+)['\"]?", |
| | r"analyze[:\s]+['\"]?([^'\"]+)['\"]?\s+sentiment", |
| | r"['\"]([^'\"]+)['\"].*sentiment" |
| | ] |
| | |
| | for pattern in sentiment_patterns: |
| | match = re.search(pattern, user_query, re.IGNORECASE) |
| | if match: |
| | text_to_analyze = match.group(1).strip() |
| | print(f"π Extracted text via pattern: '{text_to_analyze}'") |
| | break |
| | |
| | print(f"π― Final text for sentiment analysis: '{text_to_analyze}'") |
| | |
| | return [SubTask( |
| | id=str(uuid.uuid4()), |
| | task_type=TaskType.SENTIMENT_ANALYSIS, |
| | description=f"Analyze sentiment: {text_to_analyze}", |
| | parameters={"text": text_to_analyze}, |
| | agent_id="sentiment_agent", |
| | confidence=0.8 |
| | )] |
| | elif any(word in query_lower for word in ["hiking", "trail", "trails", "trek", "trekking", "hike", "hikes"]): |
| | |
| | import re |
| | |
| | |
| | location_patterns = [ |
| | r"(?:in|at|near|around|close to)\s+([a-zA-Z\s,]+?)(?:\s+within|\s+\d|$|\.|,)", |
| | r"([A-Z][a-zA-Z\s]+?)(?:\s+within|\s+\d|$)" |
| | ] |
| | |
| | location = None |
| | for pattern in location_patterns: |
| | location_match = re.search(pattern, user_query) |
| | if location_match: |
| | location = location_match.group(1).strip() |
| | break |
| | |
| | if not location: |
| | location = user_query |
| | |
| | |
| | difficulty = None |
| | if "easy" in query_lower: |
| | difficulty = "easy" |
| | elif "moderate" in query_lower: |
| | difficulty = "moderate" |
| | elif any(word in query_lower for word in ["hard", "difficult", "challenging"]): |
| | difficulty = "hard" |
| | elif any(word in query_lower for word in ["very hard", "extreme", "strenuous"]): |
| | difficulty = "very_hard" |
| | |
| | |
| | max_distance = 30 |
| | distance_match = re.search(r"within\s+(\d+)\s*(?:mile|miles|mi)", query_lower) |
| | if distance_match: |
| | max_distance = int(distance_match.group(1)) |
| | |
| | return [SubTask( |
| | id=str(uuid.uuid4()), |
| | task_type=TaskType.HIKING_SEARCH, |
| | description=f"Find hiking trails: {user_query}", |
| | parameters={"location": location, "difficulty": difficulty, "max_distance": max_distance}, |
| | agent_id="hiking_agent", |
| | confidence=0.8 |
| | )] |
| | elif any(word in query_lower for word in ["hotel", "accommodation", "stay", "place"]): |
| | return [SubTask( |
| | id=str(uuid.uuid4()), |
| | task_type=TaskType.LOCATION_SEARCH, |
| | description=f"Find accommodations: {user_query}", |
| | parameters={"query": user_query, "max_distance": 20}, |
| | agent_id="location_agent", |
| | confidence=0.7 |
| | )] |
| | elif any(word in query_lower for word in ["restaurant", "food", "dining", "eat"]): |
| | return [SubTask( |
| | id=str(uuid.uuid4()), |
| | task_type=TaskType.RESTAURANT_SEARCH, |
| | description=f"Find restaurants: {user_query}", |
| | parameters={"query": user_query, "cuisine": None}, |
| | agent_id="restaurant_agent", |
| | confidence=0.7 |
| | )] |
| | else: |
| | |
| | return [SubTask( |
| | id=str(uuid.uuid4()), |
| | task_type=TaskType.WEB_SEARCH, |
| | description=f"Web search: {user_query}", |
| | parameters={"query": user_query, "category": None}, |
| | agent_id="web_agent", |
| | confidence=0.6 |
| | )] |
| |
|
| | async def test_ticker_detection(self, test_queries: List[str] = None) -> Dict[str, str]: |
| | """Test ticker detection on various queries to help debug issues.""" |
| | if test_queries is None: |
| | test_queries = [ |
| | "What's the current stock price of NVDA?", |
| | "NVDA stock price", |
| | "Get NVIDIA stock price", |
| | "What is TSLA trading at?", |
| | "Apple stock price", |
| | "AAPL current price" |
| | ] |
| | |
| | results = {} |
| | print("π§ͺ Testing ticker detection...") |
| | |
| | for query in test_queries: |
| | detected = await self.detect_ticker_symbol(query) |
| | results[query] = detected |
| | print(f" '{query}' β '{detected}'") |
| | |
| | return results |
| |
|
| | async def detect_ticker_symbol(self, user_query: str) -> str: |
| | """ |
| | Use LLM to detect and extract ticker symbols from financial queries. |
| | """ |
| | try: |
| | |
| | import re |
| | |
| | print(f"π Analyzing query for ticker: '{user_query}'") |
| | |
| | |
| | ticker_patterns = [ |
| | r'\b([A-Z]{1,5})\b(?:\s+stock|\s+price|\s+quote)', |
| | r'\bof\s+([A-Z]{2,5})\b', |
| | r'\b([A-Z]{2,5})\s*\??\s*$', |
| | r'\b([A-Z]{2,5})\b(?=\s)', |
| | r'\b([A-Z]{2,5})\b', |
| | ] |
| | |
| | |
| | company_tickers = { |
| | 'nvidia': 'NVDA', |
| | 'apple': 'AAPL', |
| | 'tesla': 'TSLA', |
| | 'microsoft': 'MSFT', |
| | 'google': 'GOOGL', |
| | 'amazon': 'AMZN', |
| | 'meta': 'META', |
| | 'facebook': 'META', |
| | 'spy': 'SPY', |
| | 'qqq': 'QQQ' |
| | } |
| | |
| | query_lower = user_query.lower() |
| | |
| | |
| | for i, pattern in enumerate(ticker_patterns): |
| | matches = re.findall(pattern, user_query, re.IGNORECASE) |
| | print(f" Pattern {i+1} ('{pattern}'): {matches}") |
| | for match in matches: |
| | if len(match) >= 2 and match.upper() not in ['THE', 'AND', 'FOR', 'ARE', 'BUT', 'NOT', 'YOU', 'ALL', 'CAN', 'HER', 'WAS', 'ONE', 'OUR', 'HAD', 'BUT', 'WHAT', 'BEEN', 'THAT', 'WITH', 'THIS']: |
| | print(f"π― Direct ticker pattern match found: {match.upper()}") |
| | return match.upper() |
| | |
| | |
| | for company, ticker in company_tickers.items(): |
| | if company in query_lower: |
| | print(f"π’ Company name match found: {company} β {ticker}") |
| | return ticker |
| | |
| | |
| | prompt = f""" |
| | You are a financial assistant. Your task is to identify stock ticker symbols in queries. |
| | |
| | Query: "{user_query}" |
| | |
| | If this query mentions a company or stock, return ONLY the ticker symbol (e.g., "AAPL", "TSLA", "NVDA"). |
| | If no ticker can be identified, return "UNKNOWN". |
| | |
| | Examples: |
| | - "Apple stock price" β AAPL |
| | - "Tesla earnings" β TSLA |
| | - "NVIDIA performance" β NVDA |
| | - "Microsoft news" β MSFT |
| | - "What's the current stock price of NVDA?" β NVDA |
| | - "weather forecast" β UNKNOWN |
| | |
| | Response (ticker only):""" |
| |
|
| | if self.model_name == "anthropic" and self.model: |
| | response = self.model([{"role": "user", "content": prompt}]) |
| | ticker = str(response).strip().upper() |
| | |
| | |
| | if ticker and ticker != "UNKNOWN" and len(ticker) <= 5 and ticker.isalpha(): |
| | print(f"π€ LLM ticker detection: {ticker}") |
| | return ticker |
| | |
| | print(f"β No ticker detected for query: {user_query}") |
| | return "UNKNOWN" |
| | |
| | except Exception as e: |
| | print(f"β Ticker detection failed: {e}") |
| | return "UNKNOWN" |
| |
|
| | async def enhance_financial_query(self, user_query: str) -> str: |
| | """Enhance financial queries with ticker symbol detection.""" |
| | ticker_result = await self.detect_ticker_symbol(user_query) |
| | |
| | if ticker_result != "UNKNOWN": |
| | |
| | enhanced_query = f"{ticker_result} stock price quote market data" |
| | print(f"π― Enhanced financial query: '{user_query}' β '{enhanced_query}' (ticker: {ticker_result})") |
| | return enhanced_query |
| | else: |
| | print(f"π Using original query: '{user_query}' (no ticker detected)") |
| | return user_query |
| |
|
| | class MCPClientManager: |
| | """Enhanced MCP Client Manager with LLM-powered task decomposition.""" |
| | |
| | def __init__(self, server_url: str = "http://localhost:7861/gradio_api/mcp/sse", model_name: str = "anthropic"): |
| | self.server_url = server_url |
| | self.model_name = model_name |
| | self.task_decomposer = LLMTaskDecomposer(model_name) |
| | self.mcp_client = None |
| | self.session_id = str(uuid.uuid4()) |
| | self.is_connected = False |
| | self.available_tools = [] |
| |
|
| | async def connect_to_server(self) -> bool: |
| | """Connect to an already running MCP server using smolagents MCPClient.""" |
| | max_retries = 3 if "localhost" in self.server_url else 2 |
| | retry_delay = 2 |
| | |
| | for attempt in range(max_retries): |
| | try: |
| | if attempt > 0: |
| | print(f"π Retry attempt {attempt + 1}/{max_retries}") |
| | await asyncio.sleep(retry_delay) |
| | |
| | print(f"π Attempting to connect to MCP server at: {self.server_url}") |
| | |
| | |
| | timeout = 15 if "localhost" in self.server_url else 45 |
| | |
| | |
| | self.mcp_client = MCPClient({ |
| | "url": self.server_url, |
| | "transport": "sse", |
| | "timeout": timeout |
| | }) |
| | |
| | |
| | try: |
| | print("π Fetching available tools...") |
| | self.available_tools = self.mcp_client.get_tools() |
| | tool_names = [tool.name for tool in self.available_tools] |
| | print(f"β
Connected to MCP server. Available tools: {tool_names}") |
| | |
| | |
| | if self.available_tools: |
| | print("π Tool Details:") |
| | for tool in self.available_tools: |
| | print(f" β’ {tool.name}") |
| | else: |
| | print("β οΈ Warning: No tools found on the server") |
| | |
| | |
| | if tool_names and any("_" in name for name in tool_names): |
| | print("π§ Detected prefixed tool names - using flexible matching") |
| | |
| | except Exception as tools_error: |
| | print(f"β οΈ Warning: Connected but failed to get tools: {tools_error}") |
| | self.available_tools = [] |
| | |
| | self.is_connected = True |
| | return True |
| | |
| | except Exception as e: |
| | error_msg = str(e) |
| | if "timeout" in error_msg.lower() or "connection" in error_msg.lower(): |
| | print(f"β±οΈ Connection attempt {attempt + 1} failed: {error_msg}") |
| | else: |
| | print(f"β Connection attempt {attempt + 1} failed: {error_msg}") |
| | |
| | if attempt == max_retries - 1: |
| | print(f"β Failed to connect to MCP server after {max_retries} attempts") |
| | print(f"π‘ Connection troubleshooting for: {self.server_url}") |
| | |
| | if "localhost" in self.server_url: |
| | print("π LOCAL SERVER ISSUES:") |
| | print(" β’ Make sure the MCP server is running locally") |
| | print(" β’ Check if port 7861 is available") |
| | print(" β’ Try running: python server.py in the mcp_server directory") |
| | else: |
| | print("π REMOTE SERVER ISSUES:") |
| | if "hf.space" in self.server_url: |
| | print(" β’ The Hugging Face Space might be PRIVATE (not publicly accessible)") |
| | print(" β’ Make the Space PUBLIC in HF settings, or") |
| | print(" β’ Use a local server instead") |
| | print(" β’ Check your internet connection") |
| | print(" β’ Verify the server URL is correct and accessible") |
| | |
| | await self._cleanup() |
| | |
| | return False |
| | |
| | async def execute_subtask(self, subtask: SubTask) -> Dict[str, Any]: |
| | """Execute a subtask using the MCP tool.""" |
| | if not self.is_connected or not self.mcp_client: |
| | return {"error": "Not connected to MCP server"} |
| | |
| | try: |
| | agent = self.task_decomposer.agents.get(subtask.agent_id) |
| | if not agent: |
| | return {"error": f"Agent {subtask.agent_id} not found"} |
| | |
| | tool_name = agent.tool_name |
| | |
| | |
| | available_tool_names = [tool.name for tool in self.available_tools] |
| | print(f"π Looking for tool '{tool_name}' among available tools: {available_tool_names}") |
| | |
| | |
| | tool = None |
| | for available_tool in self.available_tools: |
| | |
| | if available_tool.name == tool_name: |
| | tool = available_tool |
| | print(f"β
Found exact match: {available_tool.name}") |
| | break |
| | |
| | elif available_tool.name.endswith(f"_{tool_name}") or available_tool.name.endswith(tool_name): |
| | tool = available_tool |
| | print(f"β
Found suffix match: {available_tool.name} matches {tool_name}") |
| | break |
| | |
| | if not tool: |
| | return { |
| | "error": f"Tool {tool_name} not available on server", |
| | "available_tools": available_tool_names, |
| | "requested_tool": tool_name, |
| | "agent_id": subtask.agent_id |
| | } |
| | |
| | |
| | if tool_name == "web_search" and subtask.agent_id == "web_agent": |
| | |
| | original_query = subtask.parameters.get("query", "") |
| | enhanced_query = await self.task_decomposer.enhance_financial_query(original_query) |
| | subtask.parameters["query"] = enhanced_query |
| | print(f"π‘ Web search query enhanced: '{original_query}' β '{enhanced_query}'") |
| | |
| | |
| | filtered_params = self._filter_tool_parameters(tool_name, subtask.parameters) |
| | print(f"π§ Filtered parameters for {tool_name}: {filtered_params}") |
| | |
| | |
| | try: |
| | result = tool(**filtered_params) |
| | |
| | |
| | if isinstance(result, str): |
| | try: |
| | parsed_result = json.loads(result) |
| | except json.JSONDecodeError: |
| | parsed_result = {"result": result} |
| | elif isinstance(result, dict): |
| | parsed_result = result |
| | elif hasattr(result, 'content'): |
| | |
| | parsed_result = {"result": str(result.content)} |
| | else: |
| | parsed_result = {"result": str(result)} |
| | |
| | subtask.status = "completed" |
| | subtask.result = parsed_result |
| | return parsed_result |
| | |
| | except Exception as tool_error: |
| | return {"error": f"Tool execution error: {str(tool_error)}", "tool_name": tool_name} |
| | |
| | except Exception as e: |
| | subtask.status = "failed" |
| | return {"error": f"Subtask execution failed: {str(e)}", "subtask_id": subtask.id} |
| | |
| | def _filter_tool_parameters(self, tool_name: str, parameters: Dict[str, Any]) -> Dict[str, Any]: |
| | """Filter and map parameters based on tool requirements.""" |
| | |
| | |
| | tool_param_mappings = { |
| | "web_search": { |
| | "allowed_params": ["query", "max_results"], |
| | "param_mapping": { |
| | "search_query": "query", |
| | "search_term": "query", |
| | "q": "query", |
| | "data_type": None, |
| | "category": None |
| | } |
| | }, |
| | "sentiment_analysis": { |
| | "allowed_params": ["text"], |
| | "param_mapping": { |
| | "input_text": "text", |
| | "content": "text" |
| | } |
| | }, |
| | "place_search": { |
| | "allowed_params": ["query", "max_distance"], |
| | "param_mapping": { |
| | "location": "query", |
| | "search_query": "query", |
| | "distance": "max_distance" |
| | } |
| | }, |
| | "restaurant_search": { |
| | "allowed_params": ["query", "cuisine"], |
| | "param_mapping": { |
| | "location": "query", |
| | "search_query": "query", |
| | "cuisine_type": "cuisine" |
| | } |
| | }, |
| | "hiking_search": { |
| | "allowed_params": ["location", "difficulty", "max_distance"], |
| | "param_mapping": { |
| | "query": "location", |
| | "search_query": "location", |
| | "skill_level": "difficulty" |
| | } |
| | } |
| | } |
| | |
| | mapping_config = tool_param_mappings.get(tool_name, { |
| | "allowed_params": ["query"], |
| | "param_mapping": {} |
| | }) |
| | |
| | filtered_params = {} |
| | |
| | for param_key, param_value in parameters.items(): |
| | |
| | mapped_key = mapping_config["param_mapping"].get(param_key, param_key) |
| | |
| | |
| | if mapped_key is None: |
| | continue |
| | |
| | |
| | if param_value is None or param_value == "": |
| | continue |
| | |
| | |
| | if mapped_key in mapping_config["allowed_params"]: |
| | filtered_params[mapped_key] = param_value |
| | |
| | |
| | if tool_name in ["web_search"] and "query" not in filtered_params: |
| | |
| | if parameters: |
| | filtered_params["query"] = str(list(parameters.values())[0]) |
| | elif tool_name == "hiking_search" and "location" not in filtered_params: |
| | |
| | if "query" in parameters and parameters["query"]: |
| | filtered_params["location"] = str(parameters["query"]) |
| | elif tool_name == "restaurant_search" and "query" not in filtered_params: |
| | |
| | if "location" in parameters and parameters["location"]: |
| | filtered_params["query"] = str(parameters["location"]) |
| | elif tool_name == "place_search" and "query" not in filtered_params: |
| | |
| | if "location" in parameters and parameters["location"]: |
| | filtered_params["query"] = str(parameters["location"]) |
| | |
| | return filtered_params |
| | |
| | def test_tool_matching(self) -> str: |
| | """Test tool matching logic for debugging purposes.""" |
| | if not self.available_tools: |
| | return "β No tools available to test" |
| | |
| | results = [] |
| | results.append("π§ͺ Tool Matching Test Results:") |
| | results.append("") |
| | |
| | |
| | for agent_id, agent in self.task_decomposer.agents.items(): |
| | tool_name = agent.tool_name |
| | results.append(f"π Testing agent '{agent_id}' looking for tool '{tool_name}':") |
| | |
| | |
| | exact_match = None |
| | suffix_match = None |
| | |
| | for available_tool in self.available_tools: |
| | if available_tool.name == tool_name: |
| | exact_match = available_tool.name |
| | break |
| | elif available_tool.name.endswith(f"_{tool_name}") or available_tool.name.endswith(tool_name): |
| | suffix_match = available_tool.name |
| | |
| | if exact_match: |
| | results.append(f" β
Exact match found: {exact_match}") |
| | elif suffix_match: |
| | results.append(f" β
Suffix match found: {suffix_match}") |
| | else: |
| | results.append(f" β No match found") |
| | |
| | results.append("") |
| | |
| | return "\n".join(results) |
| | |
| | async def process_query(self, user_query: str) -> Dict[str, Any]: |
| | """Process user query through LLM-powered task decomposition and agent routing.""" |
| | try: |
| | |
| | if not self.is_connected: |
| | success = await self.connect_to_server() |
| | if not success: |
| | return { |
| | "query": user_query, |
| | "error": "Failed to connect to MCP server. Please ensure the server is running separately.", |
| | "status": "failed" |
| | } |
| | |
| | |
| | subtasks = await self.task_decomposer.decompose_query(user_query) |
| | |
| | |
| | results = [] |
| | for subtask in subtasks: |
| | result = await self.execute_subtask(subtask) |
| | results.append({ |
| | "subtask_id": subtask.id, |
| | "task_type": subtask.task_type.value, |
| | "agent": subtask.agent_id, |
| | "description": subtask.description, |
| | "confidence": subtask.confidence, |
| | "result": result |
| | }) |
| | |
| | |
| | response = { |
| | "query": user_query, |
| | "subtasks_count": len(subtasks), |
| | "subtasks": results, |
| | "status": "completed", |
| | "summary": self._generate_summary(user_query, results) |
| | } |
| | |
| | return response |
| | |
| | except Exception as e: |
| | print(f"β Failed to process query: {e}") |
| | return { |
| | "query": user_query, |
| | "error": f"Query processing failed: {str(e)}", |
| | "status": "failed" |
| | } |
| | |
| | def _generate_summary(self, query: str, results: List[Dict[str, Any]]) -> str: |
| | """Generate summary of all subtask results - now simplified since server handles formatting.""" |
| | try: |
| | if not results: |
| | return f"# π€ No Results\n\nNo results available for your query. Please try a different search term.\n\n---\n*π§ Powered by AI Task Decomposition*" |
| | |
| | |
| | for result in results: |
| | try: |
| | if isinstance(result, dict) and isinstance(result.get("result"), dict): |
| | |
| | if result["result"].get("summary"): |
| | return result["result"]["summary"] |
| | |
| | elif result["result"].get("result"): |
| | formatted_content = result["result"]["result"] |
| | if isinstance(formatted_content, str) and "π" in formatted_content: |
| | |
| | return formatted_content |
| | except (KeyError, TypeError, AttributeError) as e: |
| | print(f"β οΈ Warning: Error accessing result summary: {e}") |
| | continue |
| | |
| | |
| | summary_parts = [f"# π― Results for: *{query}*", ""] |
| | |
| | for i, result in enumerate(results, 1): |
| | try: |
| | task_type = result.get("task_type", "unknown") |
| | description = result.get("description", "No description") |
| | confidence = result.get("confidence", 0.5) |
| | |
| | result_data = result.get("result", {}) |
| | |
| | if isinstance(result_data, dict) and "error" not in result_data: |
| | summary_parts.append(f"## π Task {i}: {task_type.replace('_', ' ').title()}") |
| | summary_parts.append(f"**Confidence:** {confidence:.1%}") |
| | summary_parts.append("") |
| | |
| | |
| | if result_data.get("summary"): |
| | |
| | summary_parts.append(result_data["summary"]) |
| | elif result_data.get("result"): |
| | |
| | content = result_data["result"] |
| | if isinstance(content, str): |
| | |
| | formatted_content = content.replace('\\n', '\n').replace('\\t', '\t') |
| | summary_parts.append(formatted_content) |
| | else: |
| | |
| | summary_parts.append(self._format_result_content(content)) |
| | else: |
| | |
| | summary_parts.append(self._format_result_content(result_data)) |
| | |
| | summary_parts.append("") |
| | else: |
| | |
| | summary_parts.append(f"## β {task_type.replace('_', ' ').title()} - Error") |
| | if isinstance(result_data, dict): |
| | error_msg = result_data.get('error', 'Unknown error occurred') |
| | else: |
| | error_msg = str(result_data) |
| | summary_parts.append(f"**Issue:** {error_msg}") |
| | summary_parts.append("") |
| | |
| | except Exception as e: |
| | print(f"β οΈ Warning: Error processing result {i}: {e}") |
| | summary_parts.append(f"## β Task {i} - Processing Error") |
| | summary_parts.append(f"**Issue:** {str(e)}") |
| | summary_parts.append("") |
| | |
| | summary_parts.append("---") |
| | |
| | return "\n".join(summary_parts) |
| | |
| | except Exception as e: |
| | print(f"β Error in _generate_summary: {e}") |
| | return f"# β Summary Generation Error\n\nFailed to generate summary: {str(e)}\n\n---\n*π§ Powered by AI Task Decomposition*" |
| | |
| | async def _cleanup(self): |
| | """Clean up resources.""" |
| | if self.mcp_client: |
| | try: |
| | self.mcp_client.disconnect() |
| | except: |
| | pass |
| | self.mcp_client = None |
| | |
| | self.is_connected = False |
| | |
| | async def disconnect(self): |
| | """Disconnect from MCP server.""" |
| | await self._cleanup() |
| | print("π Disconnected from MCP server") |
| |
|
| | def create_mcp_client_interface(server_url: str = "http://localhost:7861/gradio_api/mcp/sse", model_name: str = "anthropic"): |
| | """Create the Gradio interface for MCP Client with LLM-powered task decomposition.""" |
| | |
| | |
| | async def process_query(query: str): |
| | """Process user query through MCP with LLM decomposition.""" |
| | if not query.strip(): |
| | return "Please enter a query to process." |
| | |
| | try: |
| | |
| | client_manager = MCPClientManager(server_url, model_name) |
| | |
| | |
| | result = await client_manager.process_query(query) |
| | |
| | |
| | await client_manager.disconnect() |
| | |
| | if result and 'summary' in result: |
| | return result['summary'] |
| | else: |
| | return "β No results found or error occurred during processing." |
| | |
| | except Exception as e: |
| | return f"β Error processing query: {str(e)}" |
| | |
| | |
| | available_models = [] |
| | |
| | if ANTHROPIC_API_KEY: |
| | available_models.append(("π€ Claude Sonnet 4 via Anthropic", "anthropic")) |
| | |
| | if not available_models: |
| | available_models.append(("β No API Keys Configured", "none")) |
| | |
| | |
| | css = """ |
| | .gradio-container { |
| | max-width: 1200px !important; |
| | margin: auto !important; |
| | } |
| | .header-container { |
| | background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); |
| | padding: 2rem; |
| | border-radius: 15px; |
| | margin-bottom: 2rem; |
| | color: white; |
| | text-align: center; |
| | } |
| | .model-info { |
| | background: #f8fafc; |
| | border: 1px solid #e2e8f0; |
| | border-radius: 10px; |
| | padding: 1rem; |
| | margin: 1rem 0; |
| | } |
| | .example-btn { |
| | margin: 0.25rem !important; |
| | background: linear-gradient(45deg, #4f46e5, #7c3aed) !important; |
| | border: none !important; |
| | color: white !important; |
| | } |
| | .example-btn:hover { |
| | transform: translateY(-2px); |
| | box-shadow: 0 4px 12px rgba(79, 70, 229, 0.4) !important; |
| | } |
| | .input-section { |
| | background: #ffffff; |
| | border: 1px solid #e5e7eb; |
| | border-radius: 12px; |
| | padding: 1.5rem; |
| | margin: 1rem 0; |
| | box-shadow: 0 2px 8px rgba(0,0,0,0.05); |
| | } |
| | .results-container { |
| | background: #ffffff; |
| | border: 1px solid #e5e7eb; |
| | border-radius: 12px; |
| | padding: 1.5rem; |
| | margin: 1rem 0; |
| | box-shadow: 0 2px 8px rgba(0,0,0,0.05); |
| | } |
| | .control-buttons { |
| | margin-top: 1rem; |
| | gap: 1rem; |
| | } |
| | .markdown-content { |
| | line-height: 1.6; |
| | } |
| | """ |
| | |
| | with gr.Blocks(css=css, title="π VOYAGER AI") as demo: |
| | |
| | with gr.Column(elem_classes="header-container"): |
| | gr.HTML(""" |
| | <h1 style="margin: 0; font-size: 2.5rem; font-weight: bold;"> |
| | π VOYAGER AI |
| | </h1> |
| | <p style="margin: 0.5rem 0 0 0; font-size: 1.2rem; opacity: 0.9;"> |
| | Intelligent AI Assistant with Multi-Agent Coordination |
| | </p> |
| | """) |
| | |
| | |
| | with gr.Column(): |
| | |
| | with gr.Column(): |
| | gr.HTML(""" |
| | <h3 style="color: #1f2937; margin: 20px 0 15px 0; font-size: 18px; font-weight: 600;"> |
| | π‘ Quick Start - Try These Examples: |
| | </h3> |
| | """) |
| | |
| | with gr.Row(): |
| | with gr.Column(scale=1): |
| | sentiment_btn = gr.Button("π Analyze Sentiment", elem_classes=["example-btn"]) |
| | hiking_btn = gr.Button("ποΈ Hiking trails near Denver", elem_classes=["example-btn"]) |
| | with gr.Column(scale=1): |
| | stock_btn = gr.Button("π Stock Prices", elem_classes=["example-btn"]) |
| | news_btn = gr.Button("π° Latest News", elem_classes=["example-btn"]) |
| | with gr.Column(scale=1): |
| | hotel_btn = gr.Button("π¨ Find Hotels", elem_classes=["example-btn"]) |
| | restaurant_btn = gr.Button("π½οΈ Find Restaurants", elem_classes=["example-btn"]) |
| | |
| | |
| | with gr.Column(elem_classes="input-section"): |
| | |
| | query_input = gr.Textbox( |
| | placeholder="π¬ Ask me anything... (e.g., 'What's NVIDIA's stock price?' or 'Find hotels and restaurants in New york')", |
| | lines=3, |
| | label="Your Query", |
| | show_label=False, |
| | container=False |
| | ) |
| | |
| | |
| | with gr.Row(elem_classes="control-buttons"): |
| | submit_btn = gr.Button( |
| | "π§ Analyze & Execute Query", |
| | variant="primary", |
| | size="lg", |
| | scale=2 |
| | ) |
| | clear_btn = gr.Button( |
| | "π Clear All", |
| | variant="secondary", |
| | size="lg", |
| | scale=1 |
| | ) |
| | |
| | |
| | with gr.Column(elem_classes="results-container"): |
| | gr.HTML(""" |
| | <h3 style="color: #1f2937; margin: 0 0 20px 0; font-size: 18px; font-weight: 600;"> |
| | π― AI Results & Analysis |
| | </h3> |
| | """) |
| | |
| | results_output = gr.Markdown( |
| | value=f"""**Welcome to VOYAGER AI!** π§ |
| | |
| | **π― How it works:** |
| | 1. **Try one of the examples above** or type your question naturally |
| | 2. **Click "Analyze & Execute"** to get intelligent results |
| | |
| | **β¨ Features:** |
| | β’ π **Smart Query Analysis** - AI understands your intent |
| | β’ π **Task Decomposition** - Complex queries broken down into subtasks |
| | β’ π€ **Agent Routing** - Specialized agents for different tasks |
| | β’ β‘ **Real-time Data** - Live web search and current information |
| | β’ π¨ **Professional Results** - Clean, formatted responses |
| | |
| | **π Ready to start?** Try one of the example buttons above or type your own query! |
| | |
| | --- |
| | π€ **Current Model:** {available_models[0][0] if available_models[0][1] != "none" else "No API Keys Configured"}""", |
| | show_label=False, |
| | container=False, |
| | elem_classes=["markdown-content"] |
| | ) |
| | |
| | |
| | submit_btn.click( |
| | process_query, |
| | inputs=[query_input], |
| | outputs=[results_output] |
| | ) |
| | |
| | clear_btn.click( |
| | fn=lambda: ("", """**Interface Cleared!** π§Ή |
| | |
| | Ready for your next query. Try the example buttons above or ask me anything! |
| | |
| | π‘ **Quick Tips:** |
| | - Try asking about stock prices, weather, news, or travel |
| | - Use natural language - no need for specific commands |
| | - Complex queries are automatically broken down into tasks"""), |
| | outputs=[query_input, results_output] |
| | ) |
| | |
| | query_input.submit( |
| | process_query, |
| | inputs=[query_input], |
| | outputs=[results_output] |
| | ) |
| | |
| | |
| | sentiment_btn.click( |
| | fn=lambda: "Analyze sentiment: 'I absolutely love this new AI technology - it's revolutionary and amazing!'", |
| | outputs=query_input |
| | ) |
| | |
| | hiking_btn.click( |
| | fn=lambda: "Find moderate hiking trails near Denver", |
| | outputs=query_input |
| | ) |
| | |
| | stock_btn.click( |
| | fn=lambda: "What's the current stock price of SPY?", |
| | outputs=query_input |
| | ) |
| | |
| | news_btn.click( |
| | fn=lambda: "Latest news about artificial intelligence and technology", |
| | outputs=query_input |
| | ) |
| | |
| | hotel_btn.click( |
| | fn=lambda: "Find luxury hotels in New york", |
| | outputs=query_input |
| | ) |
| | |
| | restaurant_btn.click( |
| | fn=lambda: "Best Italian restaurants in New York", |
| | outputs=query_input |
| | ) |
| | |
| | return demo |
| |
|
| | async def main(): |
| | """Main entry point for the LLM-Powered MCP Client.""" |
| | |
| | setup_environment() |
| | |
| | parser = argparse.ArgumentParser(description="LLM-Powered MCP Client with Intelligent Task Decomposition") |
| | parser.add_argument( |
| | "--server-url", |
| | default="https://agents-mcp-hackathon-test-mcp-server.hf.space/gradio_api/mcp/sse", |
| | help="MCP server URL (default: https://agents-mcp-hackathon-test-mcp-server.hf.space/gradio_api/mcp/sse)" |
| | ) |
| | parser.add_argument( |
| | "--local", |
| | action="store_true", |
| | help="Use local MCP server (http://localhost:7860/gradio_api/mcp/sse) instead of remote server" |
| | ) |
| | parser.add_argument( |
| | "--port", |
| | type=int, |
| | default=7862, |
| | help="Port to run the client interface (default: 7862)" |
| | ) |
| | parser.add_argument( |
| | "--model", |
| | default="anthropic", |
| | choices=["anthropic"], |
| | help="LLM model for task decomposition (default: anthropic)" |
| | ) |
| | |
| | args = parser.parse_args() |
| | |
| | |
| | if args.local: |
| | server_url = "http://localhost:7860/gradio_api/mcp/sse" |
| | print("π Local development mode enabled") |
| | else: |
| | server_url = args.server_url |
| | print("βοΈ Using remote MCP server") |
| | |
| | print("π VOYAGER AI - LLM-Powered Task Decomposition") |
| | print("π§ Intelligent query analysis and agent coordination") |
| | print("π€ Available Models:") |
| | if ANTHROPIC_API_KEY: |
| | print(" β’ β
Claude Sonnet 4 via Anthropic") |
| | else: |
| | print(" β’ β Claude Sonnet 4 (API key missing)") |
| | |
| | print("") |
| | print(f"π‘ MCP Server: {server_url}") |
| | print(f"π§ Default Model: {args.model}") |
| | print("β" * 50) |
| | |
| | |
| | if args.model == "anthropic" and not ANTHROPIC_API_KEY: |
| | print("β οΈ Warning: Anthropic model selected but API key not configured") |
| | |
| | |
| | demo = create_mcp_client_interface(server_url, args.model) |
| | |
| | print("π Interface ready! Select your model and ask anything naturally!") |
| | |
| | |
| | if args.local: |
| | |
| | demo.launch( |
| | server_name="0.0.0.0", |
| | server_port=args.port, |
| | share=True, |
| | show_error=True |
| | ) |
| | else: |
| | |
| | demo.launch( |
| | server_name="0.0.0.0", |
| | server_port=7860, |
| | show_error=True |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | try: |
| | asyncio.run(main()) |
| | except KeyboardInterrupt: |
| | print("π Client shutdown requested") |
| | except Exception as e: |
| | print(f"β Client error: {e}") |
| | sys.exit(1) |