Router-MCP / app.py
yijun-lee's picture
Upload app.py
e6c3213 verified
raw
history blame
17.1 kB
import gradio as gr
import requests
from typing import List, Dict, Optional
from huggingface_hub import HfApi
import os
from dotenv import load_dotenv
import csv
from pinecone import Pinecone
from openai import OpenAI
# Load environment variables
load_dotenv()
# Initialize HF API with token if available
HF_TOKEN = os.getenv("HF_TOKEN")
api = HfApi(token=HF_TOKEN) if HF_TOKEN else HfApi()
def keyword_search_hf_spaces(query: str = "", limit: int = 3) -> Dict:
"""
Search for MCPs in Hugging Face Spaces.
Args:
query: Search query string
limit: Maximum number of results to return (default: 3)
Returns:
Dictionary containing search results with MCP information
"""
try:
print(f"Debug - Search query: '{query}'") # Debug log
# Use list_spaces API with mcp-server filter and sort by likes
spaces = list(api.list_spaces(
search=query,
sort="likes",
direction=-1, # Descending order
filter="mcp-server"
))
results = []
for space in spaces[:limit]: # Process up to limit matches
try:
space_info = {
"id": space.id,
"likes": space.likes,
"trending_score": space.trending_score,
"source": "huggingface"
}
results.append(space_info)
except Exception as e:
print(f"Error processing space {space.id}: {str(e)}")
continue
return {
"results": results,
"total": len(results)
}
except Exception as e:
print(f"Debug - Critical error in keyword_search_hf_spaces: {str(e)}")
return {
"error": str(e),
"results": [],
"total": 0
}
def keyword_search_smithery(query: str = "", limit: int = 3) -> Dict:
"""
Search for MCPs in Smithery Registry.
Args:
query: Search query string
limit: Maximum number of results to return (default: 3)
Returns:
Dictionary containing search results with MCP information
"""
try:
# Get Smithery token from environment
SMITHERY_TOKEN = os.getenv("SMITHERY_TOKEN")
if not SMITHERY_TOKEN:
return {
"error": "SMITHERY_TOKEN not found",
"results": [],
"total": 0
}
# Prepare headers and query parameters
headers = {
'Authorization': f'Bearer {SMITHERY_TOKEN}'
}
# Add filters for deployed and verified servers
search_query = f"{query} is:deployed"
params = {
'q': search_query,
'page': 1,
'pageSize': 100 # Get maximum results
}
# Make API request
response = requests.get(
'https://registry.smithery.ai/servers',
headers=headers,
params=params
)
if response.status_code != 200:
return {
"error": f"Smithery API error: {response.status_code}",
"results": [],
"total": 0
}
# Parse response
data = response.json()
results = []
# Sort servers by useCount and take top results up to limit
servers = sorted(data.get('servers', []), key=lambda x: x.get('useCount', 0), reverse=True)[:limit]
for server in servers:
server_info = {
"id": server.get('qualifiedName'),
"name": server.get('displayName'),
"description": server.get('description'),
"likes": server.get('useCount', 0),
"source": "smithery"
}
results.append(server_info)
return {
"results": results,
"total": len(results)
}
except Exception as e:
return {
"error": str(e),
"results": [],
"total": 0
}
def keyword_search(query: str, sources: List[str], limit: int = 3) -> Dict:
"""
Search for MCPs using keyword matching.
Args:
query: Keyword search query
sources: List of sources to search from ('huggingface', 'smithery')
limit: Maximum number of results to return (default: 3)
Returns:
Dictionary containing combined search results
"""
all_results = []
if "huggingface" in sources:
hf_results = keyword_search_hf_spaces(query, limit)
all_results.extend(hf_results.get("results", []))
if "smithery" in sources:
smithery_results = keyword_search_smithery(query, limit)
all_results.extend(smithery_results.get("results", []))
return {
"results": all_results,
"total": len(all_results),
"search_type": "keyword"
}
def embedding_search_hf_spaces(query: str = "", limit: int = 3) -> Dict:
"""
Search for MCPs in Hugging Face Spaces using semantic embedding matching.
Args:
query: Natural language search query
limit: Maximum number of results to return (default: 3)
Returns:
Dictionary containing search results with MCP information
"""
try:
# Initialize Pinecone and OpenAI
pinecone_api_key = os.getenv('PINECONE_API_KEY')
openai_api_key = os.getenv('OPENAI_API_KEY')
if not pinecone_api_key or not openai_api_key:
return {
"error": "API keys not found",
"results": [],
"total": 0
}
# Initialize clients
pc = Pinecone(api_key=pinecone_api_key)
index = pc.Index("hf-mcp")
client = OpenAI(api_key=openai_api_key)
# Generate embedding using OpenAI
response = client.embeddings.create(
input=query,
model="text-embedding-3-large"
)
query_embedding = response.data[0].embedding
# Search in Pinecone using the generated embedding
results = index.query(
namespace="",
vector=query_embedding,
top_k=limit
)
# Process results and get detailed information
space_results = []
if not results.matches:
return {
"results": [],
"total": 0
}
for match in results.matches:
space_id = match.id
try:
# Remove 'spaces/' prefix if present
repo_id = space_id.replace('spaces/', '')
# Get space information from HF API
space = api.space_info(repo_id)
space_info = {
"id": space.id,
"likes": space.likes,
"trending_score": space.trending_score,
"source": "huggingface",
"score": match.score # Add similarity score
}
space_results.append(space_info)
except Exception as e:
continue
return {
"results": space_results,
"total": len(space_results)
}
except Exception as e:
return {
"error": str(e),
"results": [],
"total": 0
}
def embedding_search_smithery(query: str = "", limit: int = 3) -> Dict:
"""
Search for MCPs in Smithery Registry using semantic embedding matching.
Args:
query: Natural language search query
limit: Maximum number of results to return (default: 3)
Returns:
Dictionary containing search results with MCP information
"""
try:
# Initialize Pinecone and OpenAI
from pinecone import Pinecone
from openai import OpenAI
import os
pinecone_api_key = os.getenv('PINECONE_API_KEY')
openai_api_key = os.getenv('OPENAI_API_KEY')
smithery_token = os.getenv('SMITHERY_TOKEN')
if not pinecone_api_key or not openai_api_key or not smithery_token:
return {
"error": "API keys not found",
"results": [],
"total": 0
}
# Initialize clients
pc = Pinecone(api_key=pinecone_api_key)
index = pc.Index("smithery-mcp")
client = OpenAI(api_key=openai_api_key)
# Generate embedding using OpenAI
response = client.embeddings.create(
input=query,
model="text-embedding-3-large"
)
query_embedding = response.data[0].embedding
# Search in Pinecone using the generated embedding
results = index.query(
namespace="",
vector=query_embedding,
top_k=limit
)
# Process results and get detailed information from Smithery
server_results = []
if not results.matches:
return {
"results": [],
"total": 0
}
# Prepare headers for Smithery API
headers = {
'Authorization': f'Bearer {smithery_token}'
}
for match in results.matches:
server_id = match.id
try:
# Get server information from Smithery API
response = requests.get(
f'https://registry.smithery.ai/servers/{server_id}',
headers=headers
)
if response.status_code != 200:
continue
server = response.json()
server_info = {
"id": server.get('qualifiedName'),
"name": server.get('displayName'),
"description": server.get('description'),
"likes": server.get('useCount', 0),
"source": "smithery",
"score": match.score # Add similarity score
}
server_results.append(server_info)
except Exception as e:
continue
return {
"results": server_results,
"total": len(server_results)
}
except Exception as e:
return {
"error": str(e),
"results": [],
"total": 0
}
def embedding_search(query: str, sources: List[str], limit: int = 3) -> Dict:
"""
Search for MCPs using semantic embedding matching.
Args:
query: Natural language search query
sources: List of sources to search from ('huggingface', 'smithery')
limit: Maximum number of results to return (default: 3)
Returns:
Dictionary containing combined search results
"""
all_results = []
if "huggingface" in sources:
try:
hf_results = embedding_search_hf_spaces(query, limit)
all_results.extend(hf_results.get("results", []))
except Exception as e:
# Fallback to keyword search if vector search fails
hf_results = keyword_search_hf_spaces(query, limit)
all_results.extend(hf_results.get("results", []))
if "smithery" in sources:
try:
smithery_results = embedding_search_smithery(query, limit)
all_results.extend(smithery_results.get("results", []))
except Exception as e:
# Fallback to keyword search if vector search fails
smithery_results = keyword_search_smithery(query, limit)
all_results.extend(smithery_results.get("results", []))
return {
"results": all_results,
"total": len(all_results),
"search_type": "embedding"
}
# Create the Gradio interface
with gr.Blocks(title="🚦 Router MCP", css="""
#client_radio {
margin-top: 0 !important;
padding-top: 0 !important;
}
#client_radio .radio-group {
gap: 0.5rem !important;
}
""") as demo:
gr.Markdown("# 🚦 Router MCP")
gr.Markdown("### Search MCP compatible spaces using natural language")
with gr.Row():
with gr.Column():
query_input = gr.Textbox(
label="Describe the MCP Server you're looking for",
placeholder="e.g., 'I need an MCP Server that can generate images'"
)
gr.Markdown("### Select sources to search")
hf_checkbox = gr.Checkbox(label="Hugging Face Spaces", value=True)
smithery_checkbox = gr.Checkbox(label="Smithery", value=False)
registry_checkbox = gr.Checkbox(label="Registry (Coming Soon)", value=False, interactive=False)
result_limit = gr.Number(
label="Maximum number of results for each source",
value=3,
minimum=1,
maximum=20,
step=1
)
gr.Markdown("### Select your MCP Client")
client_radio = gr.Radio(
choices=["Cursor", "Windsurf", "Claude Desktop", "VS Code", "Gradio"],
label="",
value="Cursor",
interactive=True,
elem_id="client_radio"
)
with gr.Row():
keyword_search_button = gr.Button("Keyword Search")
embedding_search_button = gr.Button("Semantic Search")
with gr.Column():
results_output = gr.JSON(label="Search Results")
# Set up event handlers
def get_sources():
return ["huggingface" if hf_checkbox.value else "", "smithery" if smithery_checkbox.value else ""]
def handle_keyword_mcp_search(query: str, hf: bool, sm: bool, limit: int) -> Dict:
"""
Handle keyword-based search for MCP servers across selected sources. If the client (such as Cursor or Claude) encounters a task it cannot handle, it can use MCP to search for other tools that can.
Args:
query (str): The search query string to find matching MCP servers
hf (bool): Whether to include Hugging Face Spaces in the search (converted to "huggingface" string if True)
sm (bool): Whether to include Smithery in the search (converted to "smithery" string if True)
limit (int): Maximum number of results to return per source (default: 3)
Returns:
Dict: A dictionary containing the search results with the following keys:
- results: List of found MCP servers
- total: Total number of results
- search_type: Type of search performed ("keyword")
"""
return keyword_search(
query,
["huggingface" if hf else "", "smithery" if sm else ""],
int(limit)
)
def handle_embedding_mcp_search(query: str, hf: bool, sm: bool, limit: int) -> Dict:
"""
Handle semantic embedding-based search for MCP servers across selected sources. If the client (such as Cursor or Claude) encounters a task it cannot handle, it can use MCP to search for other tools that can.
Args:
query (str): The natural language search query to find semantically similar MCP servers
hf (bool): Whether to include Hugging Face Spaces in the search (converted to "huggingface" string if True)
sm (bool): Whether to include Smithery in the search (converted to "smithery" string if True)
limit (int): Maximum number of results to return per source (default: 3)
Returns:
Dict: A dictionary containing the search results with the following keys:
- results: List of found MCP servers with similarity scores
- total: Total number of results
- search_type: Type of search performed ("embedding")
"""
return embedding_search(
query,
["huggingface" if hf else "", "smithery" if sm else ""],
int(limit)
)
keyword_search_button.click(
fn=handle_keyword_mcp_search,
inputs=[query_input, hf_checkbox, smithery_checkbox, result_limit],
outputs=results_output
)
embedding_search_button.click(
fn=handle_embedding_mcp_search,
inputs=[query_input, hf_checkbox, smithery_checkbox, result_limit],
outputs=results_output
)
# query_input.submit(
# fn=handle_embedding_search,
# inputs=[query_input, hf_checkbox, smithery_checkbox, result_limit],
# outputs=results_output
# )
if __name__ == "__main__":
demo.launch(mcp_server=True)