import gradio as gr import requests from typing import List, Dict, Optional from huggingface_hub import HfApi import os from dotenv import load_dotenv import csv from pinecone import Pinecone from openai import OpenAI # Load environment variables load_dotenv() # Initialize HF API with token if available HF_TOKEN = os.getenv("HF_TOKEN") api = HfApi(token=HF_TOKEN) if HF_TOKEN else HfApi() def keyword_search_hf_spaces(query: str = "", limit: int = 3) -> Dict: """ Search for MCPs in Hugging Face Spaces. Args: query: Search query string limit: Maximum number of results to return (default: 3) Returns: Dictionary containing search results with MCP information """ try: print(f"Debug - Search query: '{query}'") # Debug log # Use list_spaces API with mcp-server filter and sort by likes spaces = list(api.list_spaces( search=query, sort="likes", direction=-1, # Descending order filter="mcp-server" )) results = [] for space in spaces[:limit]: # Process up to limit matches try: space_info = { "id": space.id, "likes": space.likes, "trending_score": space.trending_score, "source": "huggingface" } results.append(space_info) except Exception as e: print(f"Error processing space {space.id}: {str(e)}") continue return { "results": results, "total": len(results) } except Exception as e: print(f"Debug - Critical error in keyword_search_hf_spaces: {str(e)}") return { "error": str(e), "results": [], "total": 0 } def keyword_search_smithery(query: str = "", limit: int = 3) -> Dict: """ Search for MCPs in Smithery Registry. Args: query: Search query string limit: Maximum number of results to return (default: 3) Returns: Dictionary containing search results with MCP information """ try: # Get Smithery token from environment SMITHERY_TOKEN = os.getenv("SMITHERY_TOKEN") if not SMITHERY_TOKEN: return { "error": "SMITHERY_TOKEN not found", "results": [], "total": 0 } # Prepare headers and query parameters headers = { 'Authorization': f'Bearer {SMITHERY_TOKEN}' } # Add filters for deployed and verified servers search_query = f"{query} is:deployed" params = { 'q': search_query, 'page': 1, 'pageSize': 100 # Get maximum results } # Make API request response = requests.get( 'https://registry.smithery.ai/servers', headers=headers, params=params ) if response.status_code != 200: return { "error": f"Smithery API error: {response.status_code}", "results": [], "total": 0 } # Parse response data = response.json() results = [] # Sort servers by useCount and take top results up to limit servers = sorted(data.get('servers', []), key=lambda x: x.get('useCount', 0), reverse=True)[:limit] for server in servers: server_info = { "id": server.get('qualifiedName'), "name": server.get('displayName'), "description": server.get('description'), "likes": server.get('useCount', 0), "source": "smithery" } results.append(server_info) return { "results": results, "total": len(results) } except Exception as e: return { "error": str(e), "results": [], "total": 0 } def keyword_search(query: str, sources: List[str], limit: int = 3) -> Dict: """ Search for MCPs using keyword matching. Args: query: Keyword search query sources: List of sources to search from ('huggingface', 'smithery') limit: Maximum number of results to return (default: 3) Returns: Dictionary containing combined search results """ all_results = [] if "huggingface" in sources: hf_results = keyword_search_hf_spaces(query, limit) all_results.extend(hf_results.get("results", [])) if "smithery" in sources: smithery_results = keyword_search_smithery(query, limit) all_results.extend(smithery_results.get("results", [])) return { "results": all_results, "total": len(all_results), "search_type": "keyword" } def embedding_search_hf_spaces(query: str = "", limit: int = 3) -> Dict: """ Search for MCPs in Hugging Face Spaces using semantic embedding matching. Args: query: Natural language search query limit: Maximum number of results to return (default: 3) Returns: Dictionary containing search results with MCP information """ try: print("[DEBUG] embedding_search_hf_spaces called") pinecone_api_key = os.getenv('PINECONE_API_KEY') openai_api_key = os.getenv('OPENAI_API_KEY') print(f"[DEBUG] pinecone_api_key exists: {pinecone_api_key is not None}, openai_api_key exists: {openai_api_key is not None}") if not pinecone_api_key or not openai_api_key: print("[ERROR] API keys not found") return { "error": "API keys not found", "results": [], "total": 0 } print("[DEBUG] Initializing Pinecone and OpenAI clients") pc = Pinecone(api_key=pinecone_api_key) index = pc.Index("hf-mcp") client = OpenAI(api_key=openai_api_key) print("[DEBUG] Generating embedding with OpenAI") response = client.embeddings.create( input=query, model="text-embedding-3-large" ) query_embedding = response.data[0].embedding print(f"[DEBUG] Embedding generated: {type(query_embedding)}, len={len(query_embedding)}") print("[DEBUG] Querying Pinecone index") results = index.query( namespace="", vector=query_embedding, top_k=limit ) print(f"[DEBUG] Pinecone query results: {results}") space_results = [] if not results.matches: print("[DEBUG] No matches found in Pinecone results") return { "results": [], "total": 0 } for match in results.matches: space_id = match.id try: repo_id = space_id.replace('spaces/', '') print(f"[DEBUG] Fetching space info for repo_id: {repo_id}") space = api.space_info(repo_id) space_info = { "id": space.id, "likes": space.likes, "trending_score": space.trending_score, "source": "huggingface", "score": match.score } space_results.append(space_info) except Exception as e: print(f"[ERROR] Error fetching space info for {space_id}: {str(e)}") continue return { "results": space_results, "total": len(space_results) } except Exception as e: print(f"[CRITICAL ERROR] in embedding_search_hf_spaces: {str(e)}") return { "error": str(e), "results": [], "total": 0 } def embedding_search_smithery(query: str = "", limit: int = 3) -> Dict: """ Search for MCPs in Smithery Registry using semantic embedding matching. Args: query: Natural language search query limit: Maximum number of results to return (default: 3) Returns: Dictionary containing search results with MCP information """ try: print("[DEBUG] embedding_search_smithery called") from pinecone import Pinecone from openai import OpenAI import os pinecone_api_key = os.getenv('PINECONE_API_KEY') openai_api_key = os.getenv('OPENAI_API_KEY') smithery_token = os.getenv('SMITHERY_TOKEN') print(f"[DEBUG] pinecone_api_key exists: {pinecone_api_key is not None}, openai_api_key exists: {openai_api_key is not None}, smithery_token exists: {smithery_token is not None}") if not pinecone_api_key or not openai_api_key or not smithery_token: print("[ERROR] API keys not found") return { "error": "API keys not found", "results": [], "total": 0 } print("[DEBUG] Initializing Pinecone and OpenAI clients") pc = Pinecone(api_key=pinecone_api_key) index = pc.Index("smithery-mcp") client = OpenAI(api_key=openai_api_key) print("[DEBUG] Generating embedding with OpenAI") response = client.embeddings.create( input=query, model="text-embedding-3-large" ) query_embedding = response.data[0].embedding print(f"[DEBUG] Embedding generated: {type(query_embedding)}, len={len(query_embedding)}") print("[DEBUG] Querying Pinecone index") results = index.query( namespace="", vector=query_embedding, top_k=limit ) print(f"[DEBUG] Pinecone query results: {results}") server_results = [] if not results.matches: print("[DEBUG] No matches found in Pinecone results") return { "results": [], "total": 0 } headers = { 'Authorization': f'Bearer {smithery_token}' } for match in results.matches: server_id = match.id try: print(f"[DEBUG] Fetching server info for server_id: {server_id}") response = requests.get( f'https://registry.smithery.ai/servers/{server_id}', headers=headers ) if response.status_code != 200: print(f"[ERROR] Smithery API error for {server_id}: {response.status_code}") continue server = response.json() server_info = { "id": server.get('qualifiedName'), "name": server.get('displayName'), "description": server.get('description'), "likes": server.get('useCount', 0), "source": "smithery", "score": match.score } server_results.append(server_info) except Exception as e: print(f"[ERROR] Error fetching server info for {server_id}: {str(e)}") continue return { "results": server_results, "total": len(server_results) } except Exception as e: print(f"[CRITICAL ERROR] in embedding_search_smithery: {str(e)}") return { "error": str(e), "results": [], "total": 0 } def embedding_search(query: str, sources: List[str], limit: int = 3) -> Dict: """ Search for MCPs using semantic embedding matching. Args: query: Natural language search query sources: List of sources to search from ('huggingface', 'smithery') limit: Maximum number of results to return (default: 3) Returns: Dictionary containing combined search results """ all_results = [] if "huggingface" in sources: try: hf_results = embedding_search_hf_spaces(query, limit) all_results.extend(hf_results.get("results", [])) except Exception as e: # Fallback to keyword search if vector search fails hf_results = keyword_search_hf_spaces(query, limit) all_results.extend(hf_results.get("results", [])) if "smithery" in sources: try: smithery_results = embedding_search_smithery(query, limit) all_results.extend(smithery_results.get("results", [])) except Exception as e: # Fallback to keyword search if vector search fails smithery_results = keyword_search_smithery(query, limit) all_results.extend(smithery_results.get("results", [])) return { "results": all_results, "total": len(all_results), "search_type": "embedding" } # Create the Gradio interface with gr.Blocks(title="🚦 Router MCP", css=""" #client_radio { margin-top: 0 !important; padding-top: 0 !important; } #client_radio .radio-group { gap: 0.5rem !important; } """) as demo: gr.Markdown("# 🚦 Router MCP") gr.Markdown("### Search MCP compatible spaces using natural language") with gr.Row(): with gr.Column(): query_input = gr.Textbox( label="Describe the MCP Server you're looking for", placeholder="e.g., 'I need an MCP Server that can generate images'" ) gr.Markdown("### Select sources to search") hf_checkbox = gr.Checkbox(label="Hugging Face Spaces", value=True) smithery_checkbox = gr.Checkbox(label="Smithery", value=False) registry_checkbox = gr.Checkbox(label="Registry (Coming Soon)", value=False, interactive=False) result_limit = gr.Number( label="Maximum number of results for each source", value=3, minimum=1, maximum=20, step=1 ) gr.Markdown("### Select your MCP Client") client_radio = gr.Radio( choices=["Cursor", "Windsurf", "Claude Desktop", "VS Code", "Gradio"], label="", value="Cursor", interactive=True, elem_id="client_radio" ) with gr.Row(): keyword_search_button = gr.Button("Keyword Search") embedding_search_button = gr.Button("Semantic Search") with gr.Column(): results_output = gr.JSON(label="Search Results") # Set up event handlers def get_sources(): return ["huggingface" if hf_checkbox.value else "", "smithery" if smithery_checkbox.value else ""] def handle_keyword_mcp_search(query: str, hf: bool, sm: bool, limit: int) -> Dict: """ Handle keyword-based search for MCP servers across selected sources. If the client (such as Cursor or Claude) encounters a task it cannot handle, it can use MCP to search for other tools that can. Args: query (str): The search query string to find matching MCP servers hf (bool): Whether to include Hugging Face Spaces in the search (converted to "huggingface" string if True) sm (bool): Whether to include Smithery in the search (converted to "smithery" string if True) limit (int): Maximum number of results to return per source (default: 3) Returns: Dict: A dictionary containing the search results with the following keys: - results: List of found MCP servers - total: Total number of results - search_type: Type of search performed ("keyword") """ return keyword_search( query, ["huggingface" if hf else "", "smithery" if sm else ""], int(limit) ) def handle_embedding_mcp_search(query: str, hf: bool, sm: bool, limit: int) -> Dict: """ Handle semantic embedding-based search for MCP servers across selected sources. If the client (such as Cursor or Claude) encounters a task it cannot handle, it can use MCP to search for other tools that can. Args: query (str): The natural language search query to find semantically similar MCP servers hf (bool): Whether to include Hugging Face Spaces in the search (converted to "huggingface" string if True) sm (bool): Whether to include Smithery in the search (converted to "smithery" string if True) limit (int): Maximum number of results to return per source (default: 3) Returns: Dict: A dictionary containing the search results with the following keys: - results: List of found MCP servers with similarity scores - total: Total number of results - search_type: Type of search performed ("embedding") """ return embedding_search( query, ["huggingface" if hf else "", "smithery" if sm else ""], int(limit) ) keyword_search_button.click( fn=handle_keyword_mcp_search, inputs=[query_input, hf_checkbox, smithery_checkbox, result_limit], outputs=results_output ) embedding_search_button.click( fn=handle_embedding_mcp_search, inputs=[query_input, hf_checkbox, smithery_checkbox, result_limit], outputs=results_output ) # query_input.submit( # fn=handle_embedding_search, # inputs=[query_input, hf_checkbox, smithery_checkbox, result_limit], # outputs=results_output # ) if __name__ == "__main__": demo.launch(mcp_server=True)