Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import requests | |
| from typing import List, Dict, Optional | |
| from huggingface_hub import HfApi | |
| import os | |
| from dotenv import load_dotenv | |
| import csv | |
| from pinecone import Pinecone | |
| from openai import OpenAI | |
| # Load environment variables | |
| load_dotenv() | |
| # Initialize HF API with token if available | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| api = HfApi(token=HF_TOKEN) if HF_TOKEN else HfApi() | |
| def keyword_search_hf_spaces(query: str = "", limit: int = 3) -> Dict: | |
| """ | |
| Search for MCPs in Hugging Face Spaces. | |
| Args: | |
| query: Search query string | |
| limit: Maximum number of results to return (default: 3) | |
| Returns: | |
| Dictionary containing search results with MCP information | |
| """ | |
| try: | |
| print(f"Debug - Search query: '{query}'") # Debug log | |
| # Use list_spaces API with mcp-server filter and sort by likes | |
| spaces = list(api.list_spaces( | |
| search=query, | |
| sort="likes", | |
| direction=-1, # Descending order | |
| filter="mcp-server" | |
| )) | |
| results = [] | |
| for space in spaces[:limit]: # Process up to limit matches | |
| try: | |
| space_info = { | |
| "id": space.id, | |
| "likes": space.likes, | |
| "trending_score": space.trending_score, | |
| "source": "huggingface" | |
| } | |
| results.append(space_info) | |
| except Exception as e: | |
| print(f"Error processing space {space.id}: {str(e)}") | |
| continue | |
| return { | |
| "results": results, | |
| "total": len(results) | |
| } | |
| except Exception as e: | |
| print(f"Debug - Critical error in keyword_search_hf_spaces: {str(e)}") | |
| return { | |
| "error": str(e), | |
| "results": [], | |
| "total": 0 | |
| } | |
| def keyword_search_smithery(query: str = "", limit: int = 3) -> Dict: | |
| """ | |
| Search for MCPs in Smithery Registry. | |
| Args: | |
| query: Search query string | |
| limit: Maximum number of results to return (default: 3) | |
| Returns: | |
| Dictionary containing search results with MCP information | |
| """ | |
| try: | |
| # Get Smithery token from environment | |
| SMITHERY_TOKEN = os.getenv("SMITHERY_TOKEN") | |
| if not SMITHERY_TOKEN: | |
| return { | |
| "error": "SMITHERY_TOKEN not found", | |
| "results": [], | |
| "total": 0 | |
| } | |
| # Prepare headers and query parameters | |
| headers = { | |
| 'Authorization': f'Bearer {SMITHERY_TOKEN}' | |
| } | |
| # Add filters for deployed and verified servers | |
| search_query = f"{query} is:deployed" | |
| params = { | |
| 'q': search_query, | |
| 'page': 1, | |
| 'pageSize': 100 # Get maximum results | |
| } | |
| # Make API request | |
| response = requests.get( | |
| 'https://registry.smithery.ai/servers', | |
| headers=headers, | |
| params=params | |
| ) | |
| if response.status_code != 200: | |
| return { | |
| "error": f"Smithery API error: {response.status_code}", | |
| "results": [], | |
| "total": 0 | |
| } | |
| # Parse response | |
| data = response.json() | |
| results = [] | |
| # Sort servers by useCount and take top results up to limit | |
| servers = sorted(data.get('servers', []), key=lambda x: x.get('useCount', 0), reverse=True)[:limit] | |
| for server in servers: | |
| server_info = { | |
| "id": server.get('qualifiedName'), | |
| "name": server.get('displayName'), | |
| "description": server.get('description'), | |
| "likes": server.get('useCount', 0), | |
| "source": "smithery" | |
| } | |
| results.append(server_info) | |
| return { | |
| "results": results, | |
| "total": len(results) | |
| } | |
| except Exception as e: | |
| return { | |
| "error": str(e), | |
| "results": [], | |
| "total": 0 | |
| } | |
| def keyword_search(query: str, sources: List[str], limit: int = 3) -> Dict: | |
| """ | |
| Search for MCPs using keyword matching. | |
| Args: | |
| query: Keyword search query | |
| sources: List of sources to search from ('huggingface', 'smithery') | |
| limit: Maximum number of results to return (default: 3) | |
| Returns: | |
| Dictionary containing combined search results | |
| """ | |
| all_results = [] | |
| if "huggingface" in sources: | |
| hf_results = keyword_search_hf_spaces(query, limit) | |
| all_results.extend(hf_results.get("results", [])) | |
| if "smithery" in sources: | |
| smithery_results = keyword_search_smithery(query, limit) | |
| all_results.extend(smithery_results.get("results", [])) | |
| return { | |
| "results": all_results, | |
| "total": len(all_results), | |
| "search_type": "keyword" | |
| } | |
| def embedding_search_hf_spaces(query: str = "", limit: int = 3) -> Dict: | |
| """ | |
| Search for MCPs in Hugging Face Spaces using semantic embedding matching. | |
| Args: | |
| query: Natural language search query | |
| limit: Maximum number of results to return (default: 3) | |
| Returns: | |
| Dictionary containing search results with MCP information | |
| """ | |
| try: | |
| print("[DEBUG] embedding_search_hf_spaces called") | |
| pinecone_api_key = os.getenv('PINECONE_API_KEY') | |
| openai_api_key = os.getenv('OPENAI_API_KEY') | |
| print(f"[DEBUG] pinecone_api_key exists: {pinecone_api_key is not None}, openai_api_key exists: {openai_api_key is not None}") | |
| if not pinecone_api_key or not openai_api_key: | |
| print("[ERROR] API keys not found") | |
| return { | |
| "error": "API keys not found", | |
| "results": [], | |
| "total": 0 | |
| } | |
| print("[DEBUG] Initializing Pinecone and OpenAI clients") | |
| pc = Pinecone(api_key=pinecone_api_key) | |
| index = pc.Index("hf-mcp") | |
| client = OpenAI(api_key=openai_api_key) | |
| print("[DEBUG] Generating embedding with OpenAI") | |
| response = client.embeddings.create( | |
| input=query, | |
| model="text-embedding-3-large" | |
| ) | |
| query_embedding = response.data[0].embedding | |
| print(f"[DEBUG] Embedding generated: {type(query_embedding)}, len={len(query_embedding)}") | |
| print("[DEBUG] Querying Pinecone index") | |
| results = index.query( | |
| namespace="", | |
| vector=query_embedding, | |
| top_k=limit | |
| ) | |
| print(f"[DEBUG] Pinecone query results: {results}") | |
| space_results = [] | |
| if not results.matches: | |
| print("[DEBUG] No matches found in Pinecone results") | |
| return { | |
| "results": [], | |
| "total": 0 | |
| } | |
| for match in results.matches: | |
| space_id = match.id | |
| try: | |
| repo_id = space_id.replace('spaces/', '') | |
| print(f"[DEBUG] Fetching space info for repo_id: {repo_id}") | |
| space = api.space_info(repo_id) | |
| space_info = { | |
| "id": space.id, | |
| "likes": space.likes, | |
| "trending_score": space.trending_score, | |
| "source": "huggingface", | |
| "score": match.score | |
| } | |
| space_results.append(space_info) | |
| except Exception as e: | |
| print(f"[ERROR] Error fetching space info for {space_id}: {str(e)}") | |
| continue | |
| return { | |
| "results": space_results, | |
| "total": len(space_results) | |
| } | |
| except Exception as e: | |
| print(f"[CRITICAL ERROR] in embedding_search_hf_spaces: {str(e)}") | |
| return { | |
| "error": str(e), | |
| "results": [], | |
| "total": 0 | |
| } | |
| def embedding_search_smithery(query: str = "", limit: int = 3) -> Dict: | |
| """ | |
| Search for MCPs in Smithery Registry using semantic embedding matching. | |
| Args: | |
| query: Natural language search query | |
| limit: Maximum number of results to return (default: 3) | |
| Returns: | |
| Dictionary containing search results with MCP information | |
| """ | |
| try: | |
| print("[DEBUG] embedding_search_smithery called") | |
| from pinecone import Pinecone | |
| from openai import OpenAI | |
| import os | |
| pinecone_api_key = os.getenv('PINECONE_API_KEY') | |
| openai_api_key = os.getenv('OPENAI_API_KEY') | |
| smithery_token = os.getenv('SMITHERY_TOKEN') | |
| print(f"[DEBUG] pinecone_api_key exists: {pinecone_api_key is not None}, openai_api_key exists: {openai_api_key is not None}, smithery_token exists: {smithery_token is not None}") | |
| if not pinecone_api_key or not openai_api_key or not smithery_token: | |
| print("[ERROR] API keys not found") | |
| return { | |
| "error": "API keys not found", | |
| "results": [], | |
| "total": 0 | |
| } | |
| print("[DEBUG] Initializing Pinecone and OpenAI clients") | |
| pc = Pinecone(api_key=pinecone_api_key) | |
| index = pc.Index("smithery-mcp") | |
| client = OpenAI(api_key=openai_api_key) | |
| print("[DEBUG] Generating embedding with OpenAI") | |
| response = client.embeddings.create( | |
| input=query, | |
| model="text-embedding-3-large" | |
| ) | |
| query_embedding = response.data[0].embedding | |
| print(f"[DEBUG] Embedding generated: {type(query_embedding)}, len={len(query_embedding)}") | |
| print("[DEBUG] Querying Pinecone index") | |
| results = index.query( | |
| namespace="", | |
| vector=query_embedding, | |
| top_k=limit | |
| ) | |
| print(f"[DEBUG] Pinecone query results: {results}") | |
| server_results = [] | |
| if not results.matches: | |
| print("[DEBUG] No matches found in Pinecone results") | |
| return { | |
| "results": [], | |
| "total": 0 | |
| } | |
| headers = { | |
| 'Authorization': f'Bearer {smithery_token}' | |
| } | |
| for match in results.matches: | |
| server_id = match.id | |
| try: | |
| print(f"[DEBUG] Fetching server info for server_id: {server_id}") | |
| response = requests.get( | |
| f'https://registry.smithery.ai/servers/{server_id}', | |
| headers=headers | |
| ) | |
| if response.status_code != 200: | |
| print(f"[ERROR] Smithery API error for {server_id}: {response.status_code}") | |
| continue | |
| server = response.json() | |
| server_info = { | |
| "id": server.get('qualifiedName'), | |
| "name": server.get('displayName'), | |
| "description": server.get('description'), | |
| "likes": server.get('useCount', 0), | |
| "source": "smithery", | |
| "score": match.score | |
| } | |
| server_results.append(server_info) | |
| except Exception as e: | |
| print(f"[ERROR] Error fetching server info for {server_id}: {str(e)}") | |
| continue | |
| return { | |
| "results": server_results, | |
| "total": len(server_results) | |
| } | |
| except Exception as e: | |
| print(f"[CRITICAL ERROR] in embedding_search_smithery: {str(e)}") | |
| return { | |
| "error": str(e), | |
| "results": [], | |
| "total": 0 | |
| } | |
| def embedding_search(query: str, sources: List[str], limit: int = 3) -> Dict: | |
| """ | |
| Search for MCPs using semantic embedding matching. | |
| Args: | |
| query: Natural language search query | |
| sources: List of sources to search from ('huggingface', 'smithery') | |
| limit: Maximum number of results to return (default: 3) | |
| Returns: | |
| Dictionary containing combined search results | |
| """ | |
| all_results = [] | |
| if "huggingface" in sources: | |
| try: | |
| hf_results = embedding_search_hf_spaces(query, limit) | |
| all_results.extend(hf_results.get("results", [])) | |
| except Exception as e: | |
| # Fallback to keyword search if vector search fails | |
| hf_results = keyword_search_hf_spaces(query, limit) | |
| all_results.extend(hf_results.get("results", [])) | |
| if "smithery" in sources: | |
| try: | |
| smithery_results = embedding_search_smithery(query, limit) | |
| all_results.extend(smithery_results.get("results", [])) | |
| except Exception as e: | |
| # Fallback to keyword search if vector search fails | |
| smithery_results = keyword_search_smithery(query, limit) | |
| all_results.extend(smithery_results.get("results", [])) | |
| return { | |
| "results": all_results, | |
| "total": len(all_results), | |
| "search_type": "embedding" | |
| } | |
| # Create the Gradio interface | |
| with gr.Blocks(title="π¦ Router MCP", css=""" | |
| #client_radio { | |
| margin-top: 0 !important; | |
| padding-top: 0 !important; | |
| } | |
| #client_radio .radio-group { | |
| gap: 0.5rem !important; | |
| } | |
| """) as demo: | |
| gr.Markdown("# π¦ Router MCP") | |
| gr.Markdown("### Search MCP compatible spaces using natural language") | |
| with gr.Row(): | |
| with gr.Column(): | |
| query_input = gr.Textbox( | |
| label="Describe the MCP Server you're looking for", | |
| placeholder="e.g., 'I need an MCP Server that can generate images'" | |
| ) | |
| gr.Markdown("### Select sources to search") | |
| hf_checkbox = gr.Checkbox(label="Hugging Face Spaces", value=True) | |
| smithery_checkbox = gr.Checkbox(label="Smithery", value=False) | |
| registry_checkbox = gr.Checkbox(label="Registry (Coming Soon)", value=False, interactive=False) | |
| result_limit = gr.Number( | |
| label="Maximum number of results for each source", | |
| value=3, | |
| minimum=1, | |
| maximum=20, | |
| step=1 | |
| ) | |
| gr.Markdown("### Select your MCP Client") | |
| client_radio = gr.Radio( | |
| choices=["Cursor", "Windsurf", "Claude Desktop", "VS Code", "Gradio"], | |
| label="", | |
| value="Cursor", | |
| interactive=True, | |
| elem_id="client_radio" | |
| ) | |
| with gr.Row(): | |
| keyword_search_button = gr.Button("Keyword Search") | |
| embedding_search_button = gr.Button("Semantic Search") | |
| with gr.Column(): | |
| results_output = gr.JSON(label="Search Results") | |
| # Set up event handlers | |
| def get_sources(): | |
| return ["huggingface" if hf_checkbox.value else "", "smithery" if smithery_checkbox.value else ""] | |
| def handle_keyword_mcp_search(query: str, hf: bool, sm: bool, limit: int) -> Dict: | |
| """ | |
| Handle keyword-based search for MCP servers across selected sources. If the client (such as Cursor or Claude) encounters a task it cannot handle, it can use MCP to search for other tools that can. | |
| Args: | |
| query (str): The search query string to find matching MCP servers | |
| hf (bool): Whether to include Hugging Face Spaces in the search (converted to "huggingface" string if True) | |
| sm (bool): Whether to include Smithery in the search (converted to "smithery" string if True) | |
| limit (int): Maximum number of results to return per source (default: 3) | |
| Returns: | |
| Dict: A dictionary containing the search results with the following keys: | |
| - results: List of found MCP servers | |
| - total: Total number of results | |
| - search_type: Type of search performed ("keyword") | |
| """ | |
| return keyword_search( | |
| query, | |
| ["huggingface" if hf else "", "smithery" if sm else ""], | |
| int(limit) | |
| ) | |
| def handle_embedding_mcp_search(query: str, hf: bool, sm: bool, limit: int) -> Dict: | |
| """ | |
| Handle semantic embedding-based search for MCP servers across selected sources. If the client (such as Cursor or Claude) encounters a task it cannot handle, it can use MCP to search for other tools that can. | |
| Args: | |
| query (str): The natural language search query to find semantically similar MCP servers | |
| hf (bool): Whether to include Hugging Face Spaces in the search (converted to "huggingface" string if True) | |
| sm (bool): Whether to include Smithery in the search (converted to "smithery" string if True) | |
| limit (int): Maximum number of results to return per source (default: 3) | |
| Returns: | |
| Dict: A dictionary containing the search results with the following keys: | |
| - results: List of found MCP servers with similarity scores | |
| - total: Total number of results | |
| - search_type: Type of search performed ("embedding") | |
| """ | |
| return embedding_search( | |
| query, | |
| ["huggingface" if hf else "", "smithery" if sm else ""], | |
| int(limit) | |
| ) | |
| keyword_search_button.click( | |
| fn=handle_keyword_mcp_search, | |
| inputs=[query_input, hf_checkbox, smithery_checkbox, result_limit], | |
| outputs=results_output | |
| ) | |
| embedding_search_button.click( | |
| fn=handle_embedding_mcp_search, | |
| inputs=[query_input, hf_checkbox, smithery_checkbox, result_limit], | |
| outputs=results_output | |
| ) | |
| # query_input.submit( | |
| # fn=handle_embedding_search, | |
| # inputs=[query_input, hf_checkbox, smithery_checkbox, result_limit], | |
| # outputs=results_output | |
| # ) | |
| if __name__ == "__main__": | |
| demo.launch(mcp_server=True) |