# file: mcp/registry.py import asyncio import aiohttp import os from typing import Dict, Any from fastapi.encoders import jsonable_encoder from app.config import ( MCP_SEARCH_PORT, MCP_EMAIL_PORT, MCP_CALENDAR_PORT, MCP_STORE_PORT ) # Check if running in in-memory mode (for HF Spaces) USE_IN_MEMORY_MODE = os.getenv("USE_IN_MEMORY_MCP", "true").lower() == "true" class MCPClient: """Base MCP client for server communication""" def __init__(self, base_url: str): self.base_url = base_url self.session = None async def connect(self): """Initialize connection""" if not self.session: self.session = aiohttp.ClientSession() async def close(self): """Close connection""" if self.session: await self.session.close() async def call(self, method: str, params: Dict[str, Any] = None): """Call MCP method""" if not self.session: await self.connect() # Ensure payload is JSON-serializable (handles datetimes and Pydantic models) payload = {"method": method, "params": params or {}} safe_payload = jsonable_encoder(payload) async with self.session.post( f"{self.base_url}/rpc", json=safe_payload ) as response: result = await response.json() return result.get("result") class SearchClient(MCPClient): """Search MCP client""" async def query(self, q: str): return await self.call("search.query", {"q": q}) class EmailClient(MCPClient): """Email MCP client""" async def send(self, to: str, subject: str, body: str): return await self.call("email.send", { "to": to, "subject": subject, "body": body }) async def get_thread(self, prospect_id: str): return await self.call("email.thread", {"prospect_id": prospect_id}) class CalendarClient(MCPClient): """Calendar MCP client""" async def suggest_slots(self): return await self.call("calendar.suggest_slots") async def generate_ics(self, summary: str, start_iso: str, end_iso: str): return await self.call("calendar.generate_ics", { "summary": summary, "start_iso": start_iso, "end_iso": end_iso }) class StoreClient(MCPClient): """Store MCP client""" async def save_prospect(self, prospect): return await self.call("store.save_prospect", {"prospect": prospect.dict()}) async def get_prospect(self, prospect_id: str): result = await self.call("store.get_prospect", {"id": prospect_id}) if result: from app.schema import Prospect return Prospect(**result) async def list_prospects(self): results = await self.call("store.list_prospects") from app.schema import Prospect return [Prospect(**p) for p in results] async def save_company(self, company): return await self.call("store.save_company", {"company": company}) async def get_company(self, company_id: str): result = await self.call("store.get_company", {"id": company_id}) if result: from app.schema import Company return Company(**result) async def save_fact(self, fact): return await self.call("store.save_fact", {"fact": fact.dict()}) async def save_contact(self, contact): return await self.call("store.save_contact", {"contact": contact.dict()}) async def list_contacts_by_domain(self, domain: str): results = await self.call("store.list_contacts_by_domain", {"domain": domain}) from app.schema import Contact return [Contact(**c) for c in results] async def check_suppression(self, type: str, value: str): return await self.call("store.check_suppression", {"type": type, "value": value}) async def save_handoff(self, packet): return await self.call("store.save_handoff", {"packet": packet.dict()}) async def clear_all(self): return await self.call("store.clear_all") class MCPRegistry: """ Central registry for all MCP clients Supports two modes: - HTTP mode: Connects to separate MCP server processes (local development) - In-memory mode: Uses in-memory services (HF Spaces deployment) """ def __init__(self, use_in_memory: bool = None): self.use_in_memory = use_in_memory if use_in_memory is not None else USE_IN_MEMORY_MODE if self.use_in_memory: # In-memory mode for HF Spaces from mcp.in_memory_clients import ( InMemorySearchClient, InMemoryEmailClient, InMemoryCalendarClient, InMemoryStoreClient ) from mcp.productivity_services import ( get_analytics_service, get_enrichment_service, get_validation_service, get_summary_service ) self.search = InMemorySearchClient() self.email = InMemoryEmailClient() self.calendar = InMemoryCalendarClient() self.store = InMemoryStoreClient() # New productivity services self.analytics = get_analytics_service() self.enrichment = get_enrichment_service() self.validation = get_validation_service() self.summary = get_summary_service() print("MCP Registry: Using in-memory services (HF Spaces mode)") print("MCP Registry: Loaded productivity services (Analytics, Enrichment, Validation, Summary)") else: # HTTP mode for local development self.search = SearchClient(f"http://localhost:{MCP_SEARCH_PORT}") self.email = EmailClient(f"http://localhost:{MCP_EMAIL_PORT}") self.calendar = CalendarClient(f"http://localhost:{MCP_CALENDAR_PORT}") self.store = StoreClient(f"http://localhost:{MCP_STORE_PORT}") print("MCP Registry: Using HTTP clients (local development mode)") async def connect(self): """Connect all clients""" if not self.use_in_memory: # Only HTTP clients need connection await self.search.connect() await self.email.connect() await self.calendar.connect() await self.store.connect() else: # In-memory services don't need connection print("MCP Registry: In-memory services ready (no connection needed)") async def health_check(self): """Check health of all MCP servers""" status = {} if self.use_in_memory: # In-memory services are always healthy status = { "search": "healthy (in-memory)", "email": "healthy (in-memory)", "calendar": "healthy (in-memory)", "store": "healthy (in-memory)", "analytics": "healthy (in-memory)", "enrichment": "healthy (in-memory)", "validation": "healthy (in-memory)", "summary": "healthy (in-memory)" } else: # Check HTTP servers for name, client in [ ("search", self.search), ("email", self.email), ("calendar", self.calendar), ("store", self.store) ]: try: await client.call("health") status[name] = "healthy" except Exception as e: status[name] = f"unhealthy: {str(e)}" return status def get_search_client(self): return self.search def get_email_client(self): return self.email def get_calendar_client(self): return self.calendar def get_store_client(self): return self.store def get_analytics_service(self): """Get analytics service (only available in in-memory mode)""" return getattr(self, 'analytics', None) def get_enrichment_service(self): """Get enrichment service (only available in in-memory mode)""" return getattr(self, 'enrichment', None) def get_validation_service(self): """Get validation service (only available in in-memory mode)""" return getattr(self, 'validation', None) def get_summary_service(self): """Get summary service (only available in in-memory mode)""" return getattr(self, 'summary', None) # Global registry instance _registry_instance = None def get_mcp_registry(use_in_memory: bool = None) -> MCPRegistry: """ Get or create the global MCP registry instance. Args: use_in_memory: If True, use in-memory services. If None, use env var. Returns: MCPRegistry instance """ global _registry_instance if _registry_instance is None: _registry_instance = MCPRegistry(use_in_memory=use_in_memory) return _registry_instance