cx_ai_agent_v1 / mcp /registry.py
muzakkirhussain011's picture
Add application files (text files only)
8bab08d
# file: mcp/registry.py
import asyncio
import aiohttp
import os
from typing import Dict, Any
from fastapi.encoders import jsonable_encoder
from app.config import (
MCP_SEARCH_PORT, MCP_EMAIL_PORT,
MCP_CALENDAR_PORT, MCP_STORE_PORT
)
# Check if running in in-memory mode (for HF Spaces)
USE_IN_MEMORY_MODE = os.getenv("USE_IN_MEMORY_MCP", "true").lower() == "true"
class MCPClient:
"""Base MCP client for server communication"""
def __init__(self, base_url: str):
self.base_url = base_url
self.session = None
async def connect(self):
"""Initialize connection"""
if not self.session:
self.session = aiohttp.ClientSession()
async def close(self):
"""Close connection"""
if self.session:
await self.session.close()
async def call(self, method: str, params: Dict[str, Any] = None):
"""Call MCP method"""
if not self.session:
await self.connect()
# Ensure payload is JSON-serializable (handles datetimes and Pydantic models)
payload = {"method": method, "params": params or {}}
safe_payload = jsonable_encoder(payload)
async with self.session.post(
f"{self.base_url}/rpc",
json=safe_payload
) as response:
result = await response.json()
return result.get("result")
class SearchClient(MCPClient):
"""Search MCP client"""
async def query(self, q: str):
return await self.call("search.query", {"q": q})
class EmailClient(MCPClient):
"""Email MCP client"""
async def send(self, to: str, subject: str, body: str):
return await self.call("email.send", {
"to": to, "subject": subject, "body": body
})
async def get_thread(self, prospect_id: str):
return await self.call("email.thread", {"prospect_id": prospect_id})
class CalendarClient(MCPClient):
"""Calendar MCP client"""
async def suggest_slots(self):
return await self.call("calendar.suggest_slots")
async def generate_ics(self, summary: str, start_iso: str, end_iso: str):
return await self.call("calendar.generate_ics", {
"summary": summary,
"start_iso": start_iso,
"end_iso": end_iso
})
class StoreClient(MCPClient):
"""Store MCP client"""
async def save_prospect(self, prospect):
return await self.call("store.save_prospect", {"prospect": prospect.dict()})
async def get_prospect(self, prospect_id: str):
result = await self.call("store.get_prospect", {"id": prospect_id})
if result:
from app.schema import Prospect
return Prospect(**result)
async def list_prospects(self):
results = await self.call("store.list_prospects")
from app.schema import Prospect
return [Prospect(**p) for p in results]
async def save_company(self, company):
return await self.call("store.save_company", {"company": company})
async def get_company(self, company_id: str):
result = await self.call("store.get_company", {"id": company_id})
if result:
from app.schema import Company
return Company(**result)
async def save_fact(self, fact):
return await self.call("store.save_fact", {"fact": fact.dict()})
async def save_contact(self, contact):
return await self.call("store.save_contact", {"contact": contact.dict()})
async def list_contacts_by_domain(self, domain: str):
results = await self.call("store.list_contacts_by_domain", {"domain": domain})
from app.schema import Contact
return [Contact(**c) for c in results]
async def check_suppression(self, type: str, value: str):
return await self.call("store.check_suppression", {"type": type, "value": value})
async def save_handoff(self, packet):
return await self.call("store.save_handoff", {"packet": packet.dict()})
async def clear_all(self):
return await self.call("store.clear_all")
class MCPRegistry:
"""
Central registry for all MCP clients
Supports two modes:
- HTTP mode: Connects to separate MCP server processes (local development)
- In-memory mode: Uses in-memory services (HF Spaces deployment)
"""
def __init__(self, use_in_memory: bool = None):
self.use_in_memory = use_in_memory if use_in_memory is not None else USE_IN_MEMORY_MODE
if self.use_in_memory:
# In-memory mode for HF Spaces
from mcp.in_memory_clients import (
InMemorySearchClient,
InMemoryEmailClient,
InMemoryCalendarClient,
InMemoryStoreClient
)
from mcp.productivity_services import (
get_analytics_service,
get_enrichment_service,
get_validation_service,
get_summary_service
)
self.search = InMemorySearchClient()
self.email = InMemoryEmailClient()
self.calendar = InMemoryCalendarClient()
self.store = InMemoryStoreClient()
# New productivity services
self.analytics = get_analytics_service()
self.enrichment = get_enrichment_service()
self.validation = get_validation_service()
self.summary = get_summary_service()
print("MCP Registry: Using in-memory services (HF Spaces mode)")
print("MCP Registry: Loaded productivity services (Analytics, Enrichment, Validation, Summary)")
else:
# HTTP mode for local development
self.search = SearchClient(f"http://localhost:{MCP_SEARCH_PORT}")
self.email = EmailClient(f"http://localhost:{MCP_EMAIL_PORT}")
self.calendar = CalendarClient(f"http://localhost:{MCP_CALENDAR_PORT}")
self.store = StoreClient(f"http://localhost:{MCP_STORE_PORT}")
print("MCP Registry: Using HTTP clients (local development mode)")
async def connect(self):
"""Connect all clients"""
if not self.use_in_memory:
# Only HTTP clients need connection
await self.search.connect()
await self.email.connect()
await self.calendar.connect()
await self.store.connect()
else:
# In-memory services don't need connection
print("MCP Registry: In-memory services ready (no connection needed)")
async def health_check(self):
"""Check health of all MCP servers"""
status = {}
if self.use_in_memory:
# In-memory services are always healthy
status = {
"search": "healthy (in-memory)",
"email": "healthy (in-memory)",
"calendar": "healthy (in-memory)",
"store": "healthy (in-memory)",
"analytics": "healthy (in-memory)",
"enrichment": "healthy (in-memory)",
"validation": "healthy (in-memory)",
"summary": "healthy (in-memory)"
}
else:
# Check HTTP servers
for name, client in [
("search", self.search),
("email", self.email),
("calendar", self.calendar),
("store", self.store)
]:
try:
await client.call("health")
status[name] = "healthy"
except Exception as e:
status[name] = f"unhealthy: {str(e)}"
return status
def get_search_client(self):
return self.search
def get_email_client(self):
return self.email
def get_calendar_client(self):
return self.calendar
def get_store_client(self):
return self.store
def get_analytics_service(self):
"""Get analytics service (only available in in-memory mode)"""
return getattr(self, 'analytics', None)
def get_enrichment_service(self):
"""Get enrichment service (only available in in-memory mode)"""
return getattr(self, 'enrichment', None)
def get_validation_service(self):
"""Get validation service (only available in in-memory mode)"""
return getattr(self, 'validation', None)
def get_summary_service(self):
"""Get summary service (only available in in-memory mode)"""
return getattr(self, 'summary', None)
# Global registry instance
_registry_instance = None
def get_mcp_registry(use_in_memory: bool = None) -> MCPRegistry:
"""
Get or create the global MCP registry instance.
Args:
use_in_memory: If True, use in-memory services. If None, use env var.
Returns:
MCPRegistry instance
"""
global _registry_instance
if _registry_instance is None:
_registry_instance = MCPRegistry(use_in_memory=use_in_memory)
return _registry_instance