Router-MCP / app.py
yijun-lee's picture
Update app.py
6279b2d verified
raw
history blame
26.2 kB
import gradio as gr
import requests
from typing import List, Dict
from huggingface_hub import HfApi
import os
from dotenv import load_dotenv
from pinecone import Pinecone
from openai import OpenAI
import re
# Load environment variables
load_dotenv()
# Initialize HF API with token if available
HF_TOKEN = os.getenv("HF_TOKEN")
api = HfApi(token=HF_TOKEN) if HF_TOKEN else HfApi()
def keyword_search_hf_spaces(query: str = "", limit: int = 3) -> Dict:
"""
Search for MCPs in Hugging Face Spaces.
Args:
query: Search query string
limit: Maximum number of results to return (default: 3)
Returns:
Dictionary containing search results with MCP information
"""
try:
# Use list_spaces API with mcp-server filter and sort by likes
spaces = list(api.list_spaces(
search=query,
sort="likes",
direction=-1, # Descending order
filter="mcp-server"
))
results = []
for space in spaces[:limit]: # Process up to limit matches
try:
# Convert space ID to URL format - replace all special chars with hyphens
space_id_lower = re.sub(r'[^a-z0-9]', '-', space.id.lower())
# Remove consecutive hyphens
space_id_lower = re.sub(r'-+', '-', space_id_lower)
# Remove leading and trailing hyphens
space_id_lower = space_id_lower.strip('-')
sse_url = f"https://{space_id_lower}.hf.space/gradio_api/mcp/sse"
space_info = {
"id": space.id,
"likes": space.likes,
"trending_score": space.trending_score,
"source": "huggingface",
"configuration": {
"mcpServers": {
"gradio": {
"command": "npx", # Use npx to run MCP-Remote
"args": [
"mcp-remote",
sse_url,
"--transport",
"sse-only"
]
}
}
}
}
results.append(space_info)
except Exception as e:
continue
return {
"results": results,
"total": len(results)
}
except Exception as e:
return {
"error": str(e),
"results": [],
"total": 0
}
def keyword_search_smithery(query: str = "", limit: int = 3, os_type: str = "Mac/Linux") -> Dict:
"""
Search for MCPs in Smithery Registry.
Args:
query: Search query string
limit: Maximum number of results to return (default: 3)
os_type: Operating system type ("Mac/Linux", "Windows", "WSL")
Returns:
Dictionary containing search results with MCP information
"""
try:
# Get Smithery token from environment
SMITHERY_TOKEN = os.getenv("SMITHERY_TOKEN")
if not SMITHERY_TOKEN:
return {
"error": "SMITHERY_TOKEN not found",
"results": [],
"total": 0
}
# Prepare headers and query parameters
headers = {
'Authorization': f'Bearer {SMITHERY_TOKEN}'
}
# Add filters for deployed and verified servers
search_query = f"{query} is:deployed"
params = {
'q': search_query,
'page': 1,
'pageSize': 100 # Get maximum results
}
# Make API request
response = requests.get(
'https://registry.smithery.ai/servers',
headers=headers,
params=params
)
if response.status_code != 200:
return {
"error": f"Smithery API error: {response.status_code}",
"results": [],
"total": 0
}
# Parse response
data = response.json()
results = []
# Sort servers by useCount and take top results up to limit
servers = sorted(data.get('servers', []), key=lambda x: x.get('useCount', 0), reverse=True)[:limit]
for server in servers:
server_id = server.get('qualifiedName')
# Extract server ID without @author/ prefix for configuration
config_server_id = server_id.split('/')[-1] if '/' in server_id else server_id
# Create configuration based on OS type
if os_type == "Mac/Linux":
configuration = {
"mcpServers": {
f"{config_server_id}": {
"command": "npx",
"args": [
"-y",
"@smithery/cli@latest",
"run",
f"{server_id}",
"--key",
"YOUR_SMITHERY_KEY"
]
}
}
}
elif os_type == "Windows":
configuration = {
"mcpServers": {
f"{config_server_id}": {
"command": "cmd",
"args": [
"/c",
"npx",
"-y",
"@smithery/cli@latest",
"run",
f"{server_id}",
"--key",
"YOUR_SMITHERY_KEY"
]
}
}
}
elif os_type == "WSL":
configuration = {
"mcpServers": {
f"{config_server_id}": {
"command": "wsl",
"args": [
"npx",
"-y",
"@smithery/cli@latest",
"run",
f"{server_id}",
"--key",
"YOUR_SMITHERY_KEY"
]
}
}
}
server_info = {
"id": server_id,
"name": server.get('displayName'),
"description": server.get('description'),
"likes": server.get('useCount', 0),
"source": "smithery",
"configuration": configuration
}
results.append(server_info)
return {
"results": results,
"total": len(results)
}
except Exception as e:
return {
"error": str(e),
"results": [],
"total": 0
}
def keyword_search(query: str, sources: List[str], limit: int = 3, os_type: str = "Mac/Linux") -> Dict:
"""
Search for MCPs using keyword matching.
Args:
query: Keyword search query
sources: List of sources to search from ('huggingface', 'smithery')
limit: Maximum number of results to return (default: 3)
os_type: Operating system type ("Mac/Linux", "Windows", "WSL")
Returns:
Dictionary containing combined search results
"""
all_results = []
if "huggingface" in sources:
hf_results = keyword_search_hf_spaces(query, limit)
all_results.extend(hf_results.get("results", []))
if "smithery" in sources:
smithery_results = keyword_search_smithery(query, limit, os_type)
all_results.extend(smithery_results.get("results", []))
return {
"results": all_results,
"total": len(all_results),
"search_type": "keyword"
}
def semantic_search_hf_spaces(query: str = "", limit: int = 3) -> Dict:
"""
Search for MCPs in Hugging Face Spaces using semantic embedding matching.
Args:
query: Natural language search query
limit: Maximum number of results to return (default: 3)
Returns:
Dictionary containing search results with MCP information
"""
try:
pinecone_api_key = os.getenv('PINECONE_API_KEY')
openai_api_key = os.getenv('OPENAI_API_KEY')
if not pinecone_api_key or not openai_api_key:
return {
"error": "API keys not found",
"results": [],
"total": 0
}
pc = Pinecone(api_key=pinecone_api_key)
index = pc.Index("hf-mcp")
client = OpenAI(api_key=openai_api_key)
response = client.embeddings.create(
input=query,
model="text-embedding-3-large"
)
query_embedding = response.data[0].embedding
results = index.query(
namespace="",
vector=query_embedding,
top_k=limit
)
space_results = []
if not results.matches:
return {
"results": [],
"total": 0
}
for match in results.matches:
space_id = match.id
try:
repo_id = space_id.replace('spaces/', '')
space = api.space_info(repo_id)
# Convert space ID to URL format - replace all special chars with hyphens
space_id_lower = re.sub(r'[^a-z0-9]', '-', space.id.lower())
# Remove consecutive hyphens
space_id_lower = re.sub(r'-+', '-', space_id_lower)
# Remove leading and trailing hyphens
space_id_lower = space_id_lower.strip('-')
sse_url = f"https://{space_id_lower}.hf.space/gradio_api/mcp/sse"
space_info = {
"id": space.id,
"likes": space.likes,
"trending_score": space.trending_score,
"source": "huggingface",
"score": match.score,
"configuration": {
"mcpServers": {
"gradio": {
"command": "npx",
"args": [
"mcp-remote",
sse_url,
"--transport",
"sse-only"
]
}
}
}
}
space_results.append(space_info)
except Exception as e:
continue
return {
"results": space_results,
"total": len(space_results)
}
except Exception as e:
return {
"error": str(e),
"results": [],
"total": 0
}
def semantic_search_smithery(query: str = "", limit: int = 3, os_type: str = "Mac/Linux") -> Dict:
"""
Search for MCPs in Smithery Registry using semantic embedding matching.
Args:
query: Natural language search query
limit: Maximum number of results to return (default: 3)
os_type: Operating system type ("Mac/Linux", "Windows", "WSL")
Returns:
Dictionary containing search results with MCP information
"""
try:
from pinecone import Pinecone
from openai import OpenAI
import os
pinecone_api_key = os.getenv('PINECONE_API_KEY')
openai_api_key = os.getenv('OPENAI_API_KEY')
smithery_token = os.getenv('SMITHERY_TOKEN')
if not pinecone_api_key or not openai_api_key or not smithery_token:
return {
"error": "API keys not found",
"results": [],
"total": 0
}
pc = Pinecone(api_key=pinecone_api_key)
index = pc.Index("smithery-mcp")
client = OpenAI(api_key=openai_api_key)
response = client.embeddings.create(
input=query,
model="text-embedding-3-large"
)
query_embedding = response.data[0].embedding
results = index.query(
namespace="",
vector=query_embedding,
top_k=limit
)
server_results = []
if not results.matches:
return {
"results": [],
"total": 0
}
headers = {
'Authorization': f'Bearer {smithery_token}'
}
for match in results.matches:
server_id = match.id
try:
response = requests.get(
f'https://registry.smithery.ai/servers/{server_id}',
headers=headers
)
if response.status_code != 200:
continue
server = response.json()
# Extract server ID without @author/ prefix for configuration
config_server_id = server_id.split('/')[-1] if '/' in server_id else server_id
# Create configuration based on OS type
if os_type == "Mac/Linux":
configuration = {
"mcpServers": {
f"{config_server_id}": {
"command": "npx",
"args": [
"-y",
"@smithery/cli@latest",
"run",
f"{server_id}",
"--key",
"YOUR_SMITHERY_KEY"
]
}
}
}
elif os_type == "Windows":
configuration = {
"mcpServers": {
f"{config_server_id}": {
"command": "cmd",
"args": [
"/c",
"npx",
"-y",
"@smithery/cli@latest",
"run",
f"{server_id}",
"--key",
"YOUR_SMITHERY_KEY"
]
}
}
}
elif os_type == "WSL":
configuration = {
"mcpServers": {
f"{config_server_id}": {
"command": "wsl",
"args": [
"npx",
"-y",
"@smithery/cli@latest",
"run",
f"{server_id}",
"--key",
"YOUR_SMITHERY_KEY"
]
}
}
}
server_info = {
"id": server_id,
"name": server.get('displayName'),
"description": server.get('description'),
"likes": server.get('useCount', 0),
"source": "smithery",
"score": match.score,
"configuration": configuration
}
server_results.append(server_info)
except Exception as e:
continue
return {
"results": server_results,
"total": len(server_results)
}
except Exception as e:
return {
"error": str(e),
"results": [],
"total": 0
}
def semantic_search(query: str, sources: List[str], limit: int = 3, os_type: str = "Mac/Linux") -> Dict:
"""
Search for MCPs using semantic embedding matching.
Args:
query: Natural language search query
sources: List of sources to search from ('huggingface', 'smithery')
limit: Maximum number of results to return (default: 3)
os_type: Operating system type ("Mac/Linux", "Windows", "WSL")
Returns:
Dictionary containing combined search results
"""
all_results = []
if "huggingface" in sources:
try:
hf_results = semantic_search_hf_spaces(query, limit)
all_results.extend(hf_results.get("results", []))
except Exception as e:
# Fallback to keyword search if vector search fails
hf_results = keyword_search_hf_spaces(query, limit)
all_results.extend(hf_results.get("results", []))
if "smithery" in sources:
try:
smithery_results = semantic_search_smithery(query, limit, os_type)
all_results.extend(smithery_results.get("results", []))
except Exception as e:
# Fallback to keyword search if vector search fails
smithery_results = keyword_search_smithery(query, limit, os_type)
all_results.extend(smithery_results.get("results", []))
return {
"results": all_results,
"total": len(all_results),
"search_type": "semantic"
}
# Create the Gradio interface
with gr.Blocks(title="🚦 Router MCP", css="""
/* Make JSON output expanded by default */
.json-viewer-container {
display: block !important;
}
.json-viewer-container > .json-viewer-header {
display: none !important;
}
.json-viewer-container > .json-viewer-content {
display: block !important;
max-height: none !important;
}
.json-viewer-container .json-viewer-item {
display: block !important;
}
.json-viewer-container .json-viewer-item > .json-viewer-header {
display: none !important;
}
.json-viewer-container .json-viewer-item > .json-viewer-content {
display: block !important;
max-height: none !important;
}
/* Additional selectors for nested items */
.json-viewer-container .json-viewer-item .json-viewer-item {
display: block !important;
}
.json-viewer-container .json-viewer-item .json-viewer-item > .json-viewer-header {
display: none !important;
}
.json-viewer-container .json-viewer-item .json-viewer-item > .json-viewer-content {
display: block !important;
max-height: none !important;
}
/* Title styling */
.title-container {
text-align: center;
margin: 0.5rem 0;
position: relative;
padding: 0.5rem 0;
overflow: hidden;
}
.title-container h1 {
display: inline-block;
position: relative;
z-index: 1;
font-size: 1.8rem;
margin: 0;
line-height: 1.2;
mix-blend-mode: multiply;
}
.title-container p {
position: relative;
z-index: 1;
font-size: 1rem;
margin: 0.5rem 0 0 0;
color: #666;
mix-blend-mode: multiply;
}
.traffic-light {
position: absolute;
top: 50%;
left: 50%;
transform: translate(-50%, -50%);
width: 300px;
height: 40px;
background: linear-gradient(90deg,
rgba(255, 0, 0, 0.3) 0%,
rgba(255, 165, 0, 0.3) 50%,
rgba(0, 255, 0, 0.3) 100%
);
border-radius: 20px;
z-index: 0;
filter: blur(20px);
}
""") as demo:
with gr.Column(elem_classes=["title-container"]):
gr.HTML('''
<div class="traffic-light"></div>
<h1>🚦 Router MCP</h1>
<p>Your Gateway to Optimal MCP Servers in Seconds</p>
''')
with gr.Row():
with gr.Column():
gr.Markdown("### Search MCP servers using natural language query")
query_input = gr.Textbox(
label="Describe the MCP Server you're looking for",
placeholder="e.g., 'I need an MCP Server that can generate images'"
)
gr.Markdown("### Select sources to search")
hf_checkbox = gr.Checkbox(label="Hugging Face Spaces", value=True)
smithery_checkbox = gr.Checkbox(label="Smithery", value=False)
registry_checkbox = gr.Checkbox(label="Registry (Coming Soon)", value=False, interactive=False)
result_limit = gr.Number(
label="Maximum number of results for each source",
value=3,
minimum=1,
maximum=20,
step=1
)
gr.Markdown("### Select your OS")
client_radio = gr.Radio(
choices=["Mac/Linux", "Windows", "WSL"],
label="Choose your operating system to get the appropriate command format",
value="Mac/Linux", # Default back to Mac/Linux
interactive=True,
elem_id="client_radio"
)
with gr.Row():
keyword_search_button = gr.Button("Keyword Search")
semantic_search_button = gr.Button("Semantic Search")
with gr.Column():
results_output = gr.JSON(
label="Search Results",
elem_id="results_output"
)
# Set up event handlers
def get_sources():
return ["huggingface" if hf_checkbox.value else "", "smithery" if smithery_checkbox.value else ""]
def handle_keyword_mcp_search(query: str, hf: bool, sm: bool, limit: int, os_type: str) -> Dict:
"""
Handle keyword-based search for MCP servers across selected sources. If the client (such as Cursor or Claude) encounters a task it cannot handle, it can use MCP to search for other tools that can.
Use this search when you know the specific name or keywords of the MCP Server you're looking for.
Args:
query (str): The search query string to find matching MCP servers
hf (bool): Whether to include Hugging Face Spaces in the search
sm (bool): Whether to include Smithery in the search
limit (int): Maximum number of results to return per source
os_type (str): Operating system type ("Mac/Linux", "Windows", "WSL")
Returns:
Dict: A dictionary containing the search results with the following keys:
- results: List of found MCP servers with their configurations. Each configuration can be added to the MCP Client's config file to register the server.
- total: Total number of results
- search_type: Type of search performed ("keyword")
"""
return keyword_search(
query,
["huggingface" if hf else "", "smithery" if sm else ""],
int(limit),
os_type
)
def handle_semantic_mcp_search(query: str, hf: bool, sm: bool, limit: int, os_type: str) -> Dict:
"""
Handle semantic embedding-based search for MCP servers across selected sources. If the client (such as Cursor or Claude) encounters a task it cannot handle, it can use MCP to search for other tools that can.
Use this search when your query is more abstract or conceptual, as it can understand the meaning and context of your request.
Args:
query (str): The natural language search query to find semantically similar MCP servers
hf (bool): Whether to include Hugging Face Spaces in the search
sm (bool): Whether to include Smithery in the search
limit (int): Maximum number of results to return per source
os_type (str): Operating system type ("Mac/Linux", "Windows", "WSL")
Returns:
Dict: A dictionary containing the search results with the following keys:
- results: List of found MCP servers with their configurations and similarity scores. Each configuration can be added to the MCP Client's config file to register the server.
- total: Total number of results
- search_type: Type of search performed ("semantic")
"""
return semantic_search(
query,
["huggingface" if hf else "", "smithery" if sm else ""],
int(limit),
os_type
)
keyword_search_button.click(
fn=handle_keyword_mcp_search,
inputs=[query_input, hf_checkbox, smithery_checkbox, result_limit, client_radio],
outputs=results_output
)
semantic_search_button.click(
fn=handle_semantic_mcp_search,
inputs=[query_input, hf_checkbox, smithery_checkbox, result_limit, client_radio],
outputs=results_output
)
# query_input.submit(
# fn=handle_embedding_search,
# inputs=[query_input, hf_checkbox, smithery_checkbox, result_limit],
# outputs=results_output
# )
if __name__ == "__main__":
demo.launch(mcp_server=True)