Eventure-Project-Overview / orchestration /orchestrator_mcp_server.py
yuki-sui's picture
Upload 169 files
ed71b0e verified
"""
Event Orchestrator MCP Server
Provides event discovery tools through Model Context Protocol (MCP).
Coordinates multiple MCP event discovery tools in parallel.
Wraps the EventOrchestrator to expose it as an MCP server with tools for:
- find_events: Search for events using all available tools
- get_event_details: Get detailed information about an event
"""
import os
import logging
from typing import Any, Optional
import asyncio
from fastmcp import FastMCP
from pydantic import BaseModel
import httpx
# Import the orchestrator
from event_orchestrator import EventOrchestrator
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# ============================================================================
# Tool Models
# ============================================================================
class FindEventsInput(BaseModel):
"""Input for find_events tool"""
query: str
city: str
country: str = "USA"
date_range: str = ""
interests: list = []
llm_provider: str = "google"
class EventResult(BaseModel):
"""Event result from orchestrator"""
name: str
date: str
location: str
description: str
url: str = ""
source: str
price: str = ""
confidence: float
booking_url: str = ""
# ============================================================================
# Initialize MCP Server
# ============================================================================
def make_mcp_server() -> FastMCP:
"""Create and configure the Event Orchestrator MCP server"""
mcp = FastMCP("event-orchestrator-mcp")
# Global orchestrator instance
orchestrator_instance = None
# ========================================================================
# Tools
# ========================================================================
@mcp.tool()
async def find_events(
query: str,
city: str,
country: str = "USA",
date_range: str = "",
interests: Optional[list] = None,
llm_provider: str = "google"
) -> dict:
"""
Find events using coordinated multi-tool discovery.
Searches for events in the specified location using:
- Web Search (Brave)
- Gemini Search (AI-based)
- Eventbrite (Event platform)
- Ticketmaster (Ticketing platform)
- Ultimate Event Scraper (Web crawler)
- Jina AI (Content extraction)
Args:
query: Main search query (e.g., "rock concerts", "tech conferences")
city: Target city for events
country: Target country (default: USA)
date_range: Event date range filter (e.g., "2025-01")
interests: List of user interests for ranking (e.g., ["music", "live"])
llm_provider: LLM provider for tool selection (google, openai, anthropic)
Returns:
Dictionary with:
- status: "success" or "error"
- events: List of top 5 events
- total_found: Total events discovered
- tools_used: List of tools that returned results
- reasoning: Why these tools were selected
"""
nonlocal orchestrator_instance
try:
if not orchestrator_instance:
# Initialize orchestrator on first call
# Pull gateway URL from Modal secret (required in Modal)
gateway_url = os.getenv("GATEWAY_URL")
if not gateway_url:
logger.error("❌ GATEWAY_URL not set in Modal secrets (gateway-secret)")
raise ValueError("GATEWAY_URL must be set in Modal secrets. Run: modal secret create gateway-secret GATEWAY_URL=<url>")
orchestrator_instance = EventOrchestrator(
gateway_url=gateway_url,
llm_client=None
)
logger.info(f"🎯 Orchestrator initialized with gateway: {gateway_url}")
# Prepare inputs
if interests is None:
interests = []
if not date_range:
date_range = ""
logger.info(f"πŸ” Finding events: '{query}' in {city}, {country}")
# Call orchestrator
result = await orchestrator_instance.find_events(
query=query,
city=city,
country=country,
date_range=date_range,
interests=interests,
llm_provider=llm_provider
)
# Format response
response = {
"status": "success",
"query": query,
"location": f"{city}, {country}",
"events": result.get("events", []),
"total_found": result.get("total_found", 0),
"tools_used": result.get("tools_used", []),
"reasoning": result.get("reasoning", ""),
"execution_time": result.get("execution_time", 0)
}
logger.info(f"βœ… Found {len(result.get('events', []))} events")
return response
except Exception as e:
logger.error(f"Error finding events: {e}", exc_info=True)
return {
"status": "error",
"error": str(e),
"events": [],
"total_found": 0,
"tools_used": [],
"query": query,
"location": f"{city}, {country}"
}
@mcp.tool()
async def get_orchestrator_status() -> dict:
"""
Get the current status of the Event Orchestrator.
Returns information about:
- Available tools and their status
- Gateway connectivity
- API key configuration
- Performance metrics
"""
try:
# Pull secrets from Modal (required configuration)
gateway_url = os.getenv("GATEWAY_URL")
jina_key = os.getenv("JINA_API_KEY", "")
google_key = os.getenv("GOOGLE_API_KEY", "")
openai_key = os.getenv("OPENAI_API_KEY", "")
anthropic_key = os.getenv("ANTHROPIC_API_KEY", "")
brave_key = os.getenv("BRAVE_API_KEY", "")
# Check required gateway URL
if not gateway_url:
gateway_url = "NOT_CONFIGURED"
# Try to reach gateway
gateway_up = False
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(f"{gateway_url}/health")
gateway_up = response.status_code == 200
except:
gateway_up = False
return {
"status": "operational",
"gateway": {
"url": gateway_url,
"up": gateway_up
},
"tools": [
"web-search (Brave)",
"gemini-search (Google)",
"eventbrite",
"ticketmaster",
"ultimate_event_scraper",
"jina-ai-mcp"
],
"api_keys": {
"jina": "βœ“" if jina_key else "βœ—",
"google": "βœ“" if google_key else "βœ—",
"openai": "βœ“" if openai_key else "βœ—",
"anthropic": "βœ“" if anthropic_key else "βœ—",
"brave": "βœ“" if brave_key else "βœ—"
}
}
except Exception as e:
logger.error(f"Error getting status: {e}")
return {
"status": "error",
"error": str(e)
}
@mcp.tool()
async def get_available_tools() -> dict:
"""
Get information about all available event discovery tools.
Returns details about:
- Tool names and capabilities
- Supported parameters
- Known limitations
"""
return {
"tools": [
{
"name": "web-search",
"provider": "Brave",
"type": "General web search",
"capabilities": ["search", "ranking"],
"requires_api_key": True
},
{
"name": "gemini-search",
"provider": "Google Gemini AI",
"type": "AI-based semantic search",
"capabilities": ["semantic_search", "analysis"],
"requires_api_key": True
},
{
"name": "eventbrite",
"provider": "Eventbrite Platform",
"type": "Event platform scraper",
"capabilities": ["event_discovery", "ticketing"],
"requires_api_key": False
},
{
"name": "ticketmaster",
"provider": "Ticketmaster Platform",
"type": "Ticketing platform scraper",
"capabilities": ["event_discovery", "ticketing"],
"requires_api_key": False
},
{
"name": "ultimate_event_scraper",
"provider": "Custom Web Scraper",
"type": "Advanced web crawler with Playwright",
"capabilities": ["web_scraping", "js_rendering"],
"requires_api_key": False
},
{
"name": "jina-ai-mcp",
"provider": "Jina AI",
"type": "Content extraction and ranking",
"capabilities": ["web_scraping", "ranking", "extraction"],
"requires_api_key": True
}
],
"llm_providers": ["google", "openai", "anthropic"],
"max_events_per_search": 5,
"supported_countries": ["USA", "Australia", "UK", "Canada"]
}
logger.info("βœ… Event Orchestrator MCP Server initialized with 3 tools")
return mcp
# ============================================================================
# Entry Point (for local testing)
# ============================================================================
if __name__ == "__main__":
import uvicorn
mcp = make_mcp_server()
# Run the MCP server HTTP app
uvicorn.run(
mcp.http_app(
transport="streamable-http",
stateless_http=True,
),
host="0.0.0.0",
port=8000,
log_level="info"
)