Spaces:
Sleeping
Sleeping
| """ | |
| MCP Client with Gradio UI that connects to MCP Server tools. | |
| """ | |
| import gradio as gr | |
| import os | |
| from typing import List, Dict, Any, Optional | |
| import logging | |
| import asyncio | |
| import json | |
| import requests | |
| import re | |
| from bs4 import BeautifulSoup | |
| from urllib.parse import quote_plus, urljoin | |
| import time | |
| from smolagents import LiteLLMModel, ToolCallingAgent, ToolCollection, Tool | |
| from smolagents.mcp_client import MCPClient | |
| # Import API configuration | |
| from utils.api_config import api_config | |
| # Try to import Hugging Face Inference Client | |
| try: | |
| from huggingface_hub import InferenceClient | |
| HF_AVAILABLE = True | |
| except ImportError: | |
| HF_AVAILABLE = False | |
| print("β οΈ Warning: huggingface_hub not available. Install with: pip install huggingface_hub") | |
| # API Keys Configuration - ENVIRONMENT VARIABLES ONLY | |
| FOURSQUARE_API_KEY = os.environ.get('FOURSQUARE_API_KEY') | |
| ANTHROPIC_API_KEY = os.environ.get('ANTHROPIC_API_KEY') | |
| HF_API_KEY = os.environ.get('HF_API_KEY') or os.environ.get('HUGGINGFACE_HUB_TOKEN') | |
| # Global variables for MCP client and agents | |
| mcp_client = None | |
| agent = None | |
| current_model_type = "anthropic" # Default model | |
| def setup_environment(): | |
| """Set up environment variables and configuration.""" | |
| global ANTHROPIC_API_KEY, HF_API_KEY, FOURSQUARE_API_KEY | |
| # Validate Foursquare API key | |
| if not FOURSQUARE_API_KEY: | |
| print("β οΈ Warning: FOURSQUARE_API_KEY not found in environment") | |
| print("π‘ Set environment variable: FOURSQUARE_API_KEY=your_foursquare_key") | |
| else: | |
| # Set in environment if not already set | |
| if 'FOURSQUARE_API_KEY' not in os.environ: | |
| os.environ['FOURSQUARE_API_KEY'] = FOURSQUARE_API_KEY | |
| # Validate Anthropic API key - NO HARDCODED FALLBACK | |
| if not ANTHROPIC_API_KEY: | |
| print("β οΈ Warning: ANTHROPIC_API_KEY not found in environment") | |
| print("π‘ Set environment variable: ANTHROPIC_API_KEY=your_anthropic_key") | |
| # Validate Hugging Face token | |
| if not HF_API_KEY: | |
| print("β οΈ Warning: HF_API_KEY or HUGGINGFACE_HUB_TOKEN not found in environment") | |
| print("π‘ Set environment variable: HF_API_KEY=your_hf_token or HUGGINGFACE_HUB_TOKEN=your_hf_token") | |
| print("\nπ API Configuration:") | |
| print(f"Foursquare API Key: {'β Configured' if FOURSQUARE_API_KEY else 'β Missing'}") | |
| print(f"Anthropic API Key: {'β Configured' if ANTHROPIC_API_KEY else 'β Missing'}") | |
| print(f"Hugging Face Token: {'β Configured' if HF_API_KEY else 'β Missing'}") | |
| print(f"HF Inference Client: {'β Available' if HF_AVAILABLE else 'β Not installed'}") | |
| # Additional warnings for model-specific requirements | |
| if not HF_API_KEY and HF_AVAILABLE: | |
| print("β οΈ DeepSeek model will not work without HF_API_KEY") | |
| def set_model_type(model_type: str): | |
| """Set the AI model type to use.""" | |
| global current_model_type, agent, mcp_client | |
| if model_type not in ["anthropic", "deepseek"]: | |
| raise ValueError("Model type must be 'anthropic' or 'deepseek'") | |
| if model_type == "deepseek" and not HF_AVAILABLE: | |
| raise ValueError("DeepSeek model requires huggingface_hub. Install with: pip install huggingface_hub") | |
| current_model_type = model_type | |
| # Reset agent to force recreation with new model | |
| agent = None | |
| mcp_client = None | |
| print(f"π€ Model type set to: {model_type}") | |
| def create_ai_model(): | |
| """Create the appropriate AI model based on current_model_type.""" | |
| global current_model_type | |
| if current_model_type == "anthropic": | |
| print("π€ Using Anthropic Claude Sonnet 4") | |
| if not ANTHROPIC_API_KEY: | |
| raise ValueError("ANTHROPIC_API_KEY environment variable is required for Anthropic model") | |
| return LiteLLMModel( | |
| model_id="anthropic/claude-sonnet-4-20250514", | |
| temperature=0.2, | |
| api_key=ANTHROPIC_API_KEY | |
| ) | |
| elif current_model_type == "deepseek": | |
| if not HF_AVAILABLE: | |
| raise ImportError("DeepSeek model requires huggingface_hub. Install with: pip install huggingface_hub") | |
| if not HF_API_KEY: | |
| raise ValueError("HF_API_KEY or HUGGINGFACE_HUB_TOKEN environment variable is required for DeepSeek model") | |
| print("π€ Using DeepSeek V3 via Hugging Face") | |
| # DeepSeek is not supported by LiteLLM HF provider, so use direct API | |
| print("π Using direct DeepSeek API (LiteLLM not supported)") | |
| raise ValueError("DeepSeek requires direct API - using fallback") | |
| else: | |
| raise ValueError(f"Unknown model type: {current_model_type}") | |
| class DeepSeekModel: | |
| """Custom model wrapper for DeepSeek via Hugging Face Inference Client.""" | |
| def __init__(self, api_key: str = None, temperature: float = 0.7): | |
| if not api_key: | |
| api_key = HF_API_KEY | |
| if not api_key: | |
| raise ValueError("HF_API_KEY or HUGGINGFACE_HUB_TOKEN environment variable is required for DeepSeek model") | |
| self.api_key = api_key | |
| self.temperature = temperature | |
| try: | |
| self.client = InferenceClient( | |
| provider="novita", | |
| api_key=api_key, | |
| ) | |
| self.model_name = "deepseek-ai/DeepSeek-V3-0324" | |
| print(f"β DeepSeek model initialized with token: {api_key[:10]}...{api_key[-4:] if len(api_key) > 14 else '****'}") | |
| except Exception as e: | |
| raise ValueError(f"Failed to initialize DeepSeek client: {str(e)}") | |
| def generate(self, messages, **kwargs): | |
| """Generate method required by smolagents framework.""" | |
| return self.__call__(messages, **kwargs) | |
| def __call__(self, messages, **kwargs): | |
| """Make a completion request compatible with smolagents.""" | |
| try: | |
| # Handle different input formats | |
| if isinstance(messages, str): | |
| # Simple string input | |
| hf_messages = [{"role": "user", "content": messages}] | |
| elif isinstance(messages, list): | |
| # Convert smolagents format to HF format | |
| hf_messages = [] | |
| for msg in messages: | |
| if hasattr(msg, 'content') and hasattr(msg, 'role'): | |
| hf_messages.append({ | |
| "role": msg.role, | |
| "content": msg.content | |
| }) | |
| elif isinstance(msg, dict): | |
| hf_messages.append(msg) | |
| else: | |
| hf_messages.append({ | |
| "role": "user", | |
| "content": str(msg) | |
| }) | |
| else: | |
| # Fallback for other formats | |
| hf_messages = [{"role": "user", "content": str(messages)}] | |
| completion = self.client.chat.completions.create( | |
| model=self.model_name, | |
| messages=hf_messages, | |
| max_tokens=kwargs.get('max_tokens', 1000), | |
| temperature=kwargs.get('temperature', self.temperature), | |
| ) | |
| # Return in format expected by smolagents | |
| if hasattr(completion, 'choices') and len(completion.choices) > 0: | |
| return completion.choices[0].message.content | |
| else: | |
| return "Error: No response from DeepSeek model" | |
| except Exception as e: | |
| print(f"β DeepSeek model error: {e}") | |
| return f"Error using DeepSeek model: {str(e)}" | |
| # Additional methods that might be expected by smolagents | |
| def model_id(self): | |
| """Return model identifier.""" | |
| return self.model_name | |
| def chat(self, messages, **kwargs): | |
| """Chat method as an alias to __call__.""" | |
| return self.__call__(messages, **kwargs) | |
| # Simplified tools for the demo | |
| class WebSearchTool(Tool): | |
| """Web search tool wrapper.""" | |
| def __init__(self): | |
| self.name = "web_search" | |
| self.description = "Search for current information, news, weather, general knowledge, or any topic that requires up-to-date web information." | |
| self.input_type = "object" | |
| self.output_type = "object" | |
| self.inputs = {"query": {"type": "string", "description": "Search query"}} | |
| self.required_inputs = ["query"] | |
| self.is_initialized = True | |
| def forward(self, query: str) -> str: | |
| """Provide search guidance.""" | |
| return f"""π For '{query}', try these search resources: | |
| β’ Google: https://www.google.com/search?q={quote_plus(query)} | |
| β’ Bing: https://www.bing.com/search?q={quote_plus(query)} | |
| β’ DuckDuckGo: https://duckduckgo.com/?q={quote_plus(query)}""" | |
| def create_mcp_tools() -> List[Tool]: | |
| """Create simplified tool wrappers.""" | |
| return [WebSearchTool()] | |
| def handle_agent_run(message: str) -> str: | |
| """Handle the agent run using AI models.""" | |
| global mcp_client, agent, current_model_type | |
| if not message.strip(): | |
| return "Please enter a question or request." | |
| # Special handling for DeepSeek - use direct API only | |
| if current_model_type == "deepseek": | |
| return handle_direct_deepseek_response(message) | |
| try: | |
| # Initialize agent if not already done (only for non-DeepSeek models) | |
| if not agent: | |
| # Get tools | |
| tools = create_mcp_tools() | |
| # Create the AI model | |
| model = create_ai_model() | |
| # Create agent with tools | |
| print(f"π Creating ToolCallingAgent with {len(tools)} tools") | |
| try: | |
| agent = ToolCallingAgent( | |
| tools=tools, | |
| model=model | |
| ) | |
| print(f"β Successfully created ToolCallingAgent") | |
| except Exception as e: | |
| print(f"β Failed to create ToolCallingAgent: {e}") | |
| return f"β οΈ Failed to initialize agent: {str(e)}" | |
| # Get model info for response | |
| model_name = "Claude Sonnet 4" if current_model_type == "anthropic" else "DeepSeek V3" | |
| model_emoji = "π§ " if current_model_type == "anthropic" else "π€" | |
| model_provider = "Anthropic" if current_model_type == "anthropic" else "Hugging Face" | |
| # Process the query | |
| result = agent.run(message) | |
| if not result or (isinstance(result, str) and result.strip() == ""): | |
| return f"""π€ **I couldn't process your request** | |
| **Available capabilities:** | |
| - π **Web Search**: Ask about current information, weather, news | |
| - π¬ **General Chat**: Ask me anything! | |
| Try rephrasing your question or be more specific about what you're looking for! | |
| --- | |
| {model_emoji} **Response generated by**: {model_name} ({model_provider})""" | |
| # Format the response with model information | |
| formatted_result = f"""{str(result)} | |
| --- | |
| {model_emoji} **Response generated by**: {model_name} ({model_provider}) | |
| β±οΈ **Model Type**: {current_model_type.title()}""" | |
| return formatted_result | |
| except Exception as e: | |
| error_msg = str(e) | |
| model_name = "Claude Sonnet 4" if current_model_type == "anthropic" else "DeepSeek V3" | |
| model_emoji = "π§ " if current_model_type == "anthropic" else "π€" | |
| model_provider = "Anthropic" if current_model_type == "anthropic" else "Hugging Face" | |
| if "401 Client Error" in error_msg or "Unauthorized" in error_msg: | |
| error_response = "β οΈ API Error: Please check if all required API keys are properly configured." | |
| elif "Connection" in error_msg or "timeout" in error_msg.lower(): | |
| error_response = f"β οΈ Connection Error: Cannot reach services. Please check your internet connection." | |
| else: | |
| error_response = f"β οΈ An error occurred: {error_msg}" | |
| return f"""{error_response} | |
| --- | |
| {model_emoji} **Attempted with**: {model_name} ({model_provider})""" | |
| def handle_direct_deepseek_response(message: str) -> str: | |
| """Handle direct DeepSeek response when agent framework fails.""" | |
| try: | |
| if not HF_API_KEY: | |
| return """β οΈ **DeepSeek Model Error**: HF_API_KEY not configured | |
| Please set your Hugging Face API key to use DeepSeek model. | |
| --- | |
| π€ **Attempted with**: DeepSeek V3 (Hugging Face)""" | |
| # Create direct client | |
| client = InferenceClient( | |
| provider="novita", | |
| api_key=HF_API_KEY, | |
| ) | |
| # Make direct API call | |
| completion = client.chat.completions.create( | |
| model="deepseek-ai/DeepSeek-V3-0324", | |
| messages=[{"role": "user", "content": message}], | |
| max_tokens=1000, | |
| temperature=0.7, | |
| ) | |
| if hasattr(completion, 'choices') and len(completion.choices) > 0: | |
| response = completion.choices[0].message.content | |
| return f"""{response} | |
| --- | |
| π€ **Response generated by**: DeepSeek V3 (Hugging Face) | |
| β±οΈ **Model Type**: Deepseek (Direct API) | |
| π‘ **Note**: Using direct API due to agent framework compatibility""" | |
| else: | |
| return """β οΈ **DeepSeek API Error**: No response received | |
| Please try again or switch to Anthropic model. | |
| --- | |
| π€ **Attempted with**: DeepSeek V3 (Hugging Face)""" | |
| except Exception as e: | |
| return f"""β οΈ **DeepSeek Direct API Error**: {str(e)} | |
| Please check your HF_API_KEY or try switching to Anthropic model. | |
| --- | |
| π€ **Attempted with**: DeepSeek V3 (Hugging Face)""" | |
| def create_interface(): | |
| """Create the Gradio interface.""" | |
| with gr.Blocks(title="Voyager AI - Smart Travel Assistant", css=""" | |
| .container { | |
| max-width: 1200px; | |
| margin: auto; | |
| padding: 20px; | |
| } | |
| .example-button { | |
| background: linear-gradient(45deg, #667eea, #764ba2); | |
| color: white; | |
| border-radius: 8px; | |
| border: none; | |
| margin: 4px; | |
| padding: 8px 16px; | |
| font-size: 14px; | |
| cursor: pointer; | |
| transition: all 0.2s ease; | |
| } | |
| .example-button:hover { | |
| transform: translateY(-1px); | |
| box-shadow: 0 4px 12px rgba(0,0,0,0.15); | |
| } | |
| """) as demo: | |
| def change_model(new_model_type): | |
| """Change the AI model and reset agent.""" | |
| global current_model_type, agent, mcp_client | |
| try: | |
| set_model_type(new_model_type) | |
| model_name = "Claude Sonnet 4" if new_model_type == "anthropic" else "DeepSeek V3" | |
| model_emoji = "π§ " if new_model_type == "anthropic" else "π€" | |
| if new_model_type == "deepseek": | |
| return f"""β **Model Changed Successfully!** | |
| {model_emoji} **Now using**: {model_name} ({new_model_type.title()}) | |
| π‘ **Note**: DeepSeek uses direct API for optimal compatibility. | |
| The next response will be generated using the DeepSeek V3 model.""" | |
| else: | |
| return f"""β **Model Changed Successfully!** | |
| {model_emoji} **Now using**: {model_name} ({new_model_type.title()}) | |
| The next response will be generated using the new model.""" | |
| except Exception as e: | |
| return f"β **Error changing model**: {str(e)}" | |
| gr.Markdown( | |
| f""" | |
| # π VOYAGER AI | |
| ## *Your Smart Travel & Lifestyle Assistant* | |
| ### βοΈ What Can I Help You With: | |
| - π **Sentiment Analysis**: Analyze emotions and tone in text | |
| - π¨ **Hotel & Places**: Find accommodations and places to stay | |
| - π½οΈ **Restaurant Search**: Discover dining options and food recommendations | |
| - ποΈ **Hiking Trails**: Find outdoor adventures and nature activities | |
| - π **Web Search**: Get current information, weather, news, and more | |
| """ | |
| ) | |
| # Model Selection Section | |
| with gr.Row(): | |
| with gr.Column(): | |
| model_selector = gr.Dropdown( | |
| choices=["anthropic", "deepseek"], | |
| value=current_model_type, | |
| label="π€ Select AI Model", | |
| info="Choose between Anthropic Claude Sonnet 4 or DeepSeek V3" | |
| ) | |
| with gr.Row(): | |
| input_text = gr.Textbox( | |
| label="π― Ask me anything!", | |
| placeholder="What would you like to know? Try one of the examples below or ask your own question...", | |
| lines=3 | |
| ) | |
| with gr.Row(): | |
| submit_btn = gr.Button("π Send", variant="primary") | |
| clear_btn = gr.Button("π Clear") | |
| # Clickable example queries | |
| gr.Markdown("### π‘ Try These Quick Examples:") | |
| with gr.Row(): | |
| sentiment_btn = gr.Button("π Analyze sentiment: 'Amazing service!'", elem_classes=["example-button"]) | |
| hotel_btn = gr.Button("π¨ Find hotels in Paris", elem_classes=["example-button"]) | |
| restaurant_btn = gr.Button("π½οΈ Best Italian restaurants in Boston", elem_classes=["example-button"]) | |
| with gr.Row(): | |
| hiking_btn = gr.Button("ποΈ Hiking trails near Denver", elem_classes=["example-button"]) | |
| weather_btn = gr.Button("π€οΈ What's the weather today?", elem_classes=["example-button"]) | |
| news_btn = gr.Button("π° Latest tech news", elem_classes=["example-button"]) | |
| output_text = gr.Textbox( | |
| label="β¨ AI Response", | |
| lines=15, | |
| interactive=False | |
| ) | |
| # Model change handler | |
| model_selector.change( | |
| fn=change_model, | |
| inputs=model_selector, | |
| outputs=output_text | |
| ) | |
| # Event handlers for main buttons | |
| submit_btn.click( | |
| fn=handle_agent_run, | |
| inputs=input_text, | |
| outputs=output_text, | |
| api_name="mcp_query" | |
| ) | |
| clear_btn.click( | |
| fn=lambda: ("", ""), | |
| outputs=[input_text, output_text], | |
| api_name="clear" | |
| ) | |
| # Event handlers for example buttons | |
| sentiment_btn.click( | |
| fn=lambda: "Analyze sentiment: 'Amazing service and delicious food!'", | |
| outputs=input_text | |
| ) | |
| hotel_btn.click( | |
| fn=lambda: "Find luxury hotels in Paris", | |
| outputs=input_text | |
| ) | |
| restaurant_btn.click( | |
| fn=lambda: "Show me top Italian restaurants in Boston", | |
| outputs=input_text | |
| ) | |
| hiking_btn.click( | |
| fn=lambda: "Find moderate hiking trails near Denver", | |
| outputs=input_text | |
| ) | |
| weather_btn.click( | |
| fn=lambda: "What's the weather forecast for this weekend?", | |
| outputs=input_text | |
| ) | |
| news_btn.click( | |
| fn=lambda: "Latest tech news today", | |
| outputs=input_text | |
| ) | |
| # Also allow Enter key to submit | |
| input_text.submit( | |
| fn=handle_agent_run, | |
| inputs=input_text, | |
| outputs=output_text | |
| ) | |
| return demo | |
| def main(): | |
| """Initialize and run the MCP client application.""" | |
| try: | |
| # Set up environment first | |
| setup_environment() | |
| print("\nπ Initializing MCP Client...") | |
| print(f"π Target MCP Server: {api_config.mcp_server_url}") | |
| print("π‘ Transport Protocol: Server-Sent Events (SSE)") | |
| # Create and launch the Gradio interface | |
| demo = create_interface() | |
| print("\nπ Launching Voyager AI...") | |
| # Use environment variable or default port for Hugging Face Spaces | |
| port = int(os.environ.get('GRADIO_SERVER_PORT', 7860)) | |
| print(f"π± Client URL: http://127.0.0.1:{port}") | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=port, | |
| share=False | |
| ) | |
| except Exception as e: | |
| print(f"\nβ Error during initialization: {str(e)}") | |
| raise | |
| if __name__ == "__main__": | |
| main() |