Spaces:
Sleeping
Sleeping
| # file: mcp/registry.py | |
| import asyncio | |
| import aiohttp | |
| import os | |
| from typing import Dict, Any | |
| from fastapi.encoders import jsonable_encoder | |
| from app.config import ( | |
| MCP_SEARCH_PORT, MCP_EMAIL_PORT, | |
| MCP_CALENDAR_PORT, MCP_STORE_PORT | |
| ) | |
| # Check if running in in-memory mode (for HF Spaces) | |
| USE_IN_MEMORY_MODE = os.getenv("USE_IN_MEMORY_MCP", "true").lower() == "true" | |
| class MCPClient: | |
| """Base MCP client for server communication""" | |
| def __init__(self, base_url: str): | |
| self.base_url = base_url | |
| self.session = None | |
| async def connect(self): | |
| """Initialize connection""" | |
| if not self.session: | |
| self.session = aiohttp.ClientSession() | |
| async def close(self): | |
| """Close connection""" | |
| if self.session: | |
| await self.session.close() | |
| async def call(self, method: str, params: Dict[str, Any] = None): | |
| """Call MCP method""" | |
| if not self.session: | |
| await self.connect() | |
| # Ensure payload is JSON-serializable (handles datetimes and Pydantic models) | |
| payload = {"method": method, "params": params or {}} | |
| safe_payload = jsonable_encoder(payload) | |
| async with self.session.post( | |
| f"{self.base_url}/rpc", | |
| json=safe_payload | |
| ) as response: | |
| result = await response.json() | |
| return result.get("result") | |
| class SearchClient(MCPClient): | |
| """Search MCP client""" | |
| async def query(self, q: str): | |
| return await self.call("search.query", {"q": q}) | |
| class EmailClient(MCPClient): | |
| """Email MCP client""" | |
| async def send(self, to: str, subject: str, body: str): | |
| return await self.call("email.send", { | |
| "to": to, "subject": subject, "body": body | |
| }) | |
| async def get_thread(self, prospect_id: str): | |
| return await self.call("email.thread", {"prospect_id": prospect_id}) | |
| class CalendarClient(MCPClient): | |
| """Calendar MCP client""" | |
| async def suggest_slots(self): | |
| return await self.call("calendar.suggest_slots") | |
| async def generate_ics(self, summary: str, start_iso: str, end_iso: str): | |
| return await self.call("calendar.generate_ics", { | |
| "summary": summary, | |
| "start_iso": start_iso, | |
| "end_iso": end_iso | |
| }) | |
| class StoreClient(MCPClient): | |
| """Store MCP client""" | |
| async def save_prospect(self, prospect): | |
| return await self.call("store.save_prospect", {"prospect": prospect.dict()}) | |
| async def get_prospect(self, prospect_id: str): | |
| result = await self.call("store.get_prospect", {"id": prospect_id}) | |
| if result: | |
| from app.schema import Prospect | |
| return Prospect(**result) | |
| async def list_prospects(self): | |
| results = await self.call("store.list_prospects") | |
| from app.schema import Prospect | |
| return [Prospect(**p) for p in results] | |
| async def save_company(self, company): | |
| return await self.call("store.save_company", {"company": company}) | |
| async def get_company(self, company_id: str): | |
| result = await self.call("store.get_company", {"id": company_id}) | |
| if result: | |
| from app.schema import Company | |
| return Company(**result) | |
| async def save_fact(self, fact): | |
| return await self.call("store.save_fact", {"fact": fact.dict()}) | |
| async def save_contact(self, contact): | |
| return await self.call("store.save_contact", {"contact": contact.dict()}) | |
| async def list_contacts_by_domain(self, domain: str): | |
| results = await self.call("store.list_contacts_by_domain", {"domain": domain}) | |
| from app.schema import Contact | |
| return [Contact(**c) for c in results] | |
| async def check_suppression(self, type: str, value: str): | |
| return await self.call("store.check_suppression", {"type": type, "value": value}) | |
| async def save_handoff(self, packet): | |
| return await self.call("store.save_handoff", {"packet": packet.dict()}) | |
| async def clear_all(self): | |
| return await self.call("store.clear_all") | |
| class MCPRegistry: | |
| """ | |
| Central registry for all MCP clients | |
| Supports two modes: | |
| - HTTP mode: Connects to separate MCP server processes (local development) | |
| - In-memory mode: Uses in-memory services (HF Spaces deployment) | |
| """ | |
| def __init__(self, use_in_memory: bool = None): | |
| self.use_in_memory = use_in_memory if use_in_memory is not None else USE_IN_MEMORY_MODE | |
| if self.use_in_memory: | |
| # In-memory mode for HF Spaces | |
| from mcp.in_memory_clients import ( | |
| InMemorySearchClient, | |
| InMemoryEmailClient, | |
| InMemoryCalendarClient, | |
| InMemoryStoreClient | |
| ) | |
| from mcp.productivity_services import ( | |
| get_analytics_service, | |
| get_enrichment_service, | |
| get_validation_service, | |
| get_summary_service | |
| ) | |
| self.search = InMemorySearchClient() | |
| self.email = InMemoryEmailClient() | |
| self.calendar = InMemoryCalendarClient() | |
| self.store = InMemoryStoreClient() | |
| # New productivity services | |
| self.analytics = get_analytics_service() | |
| self.enrichment = get_enrichment_service() | |
| self.validation = get_validation_service() | |
| self.summary = get_summary_service() | |
| print("MCP Registry: Using in-memory services (HF Spaces mode)") | |
| print("MCP Registry: Loaded productivity services (Analytics, Enrichment, Validation, Summary)") | |
| else: | |
| # HTTP mode for local development | |
| self.search = SearchClient(f"http://localhost:{MCP_SEARCH_PORT}") | |
| self.email = EmailClient(f"http://localhost:{MCP_EMAIL_PORT}") | |
| self.calendar = CalendarClient(f"http://localhost:{MCP_CALENDAR_PORT}") | |
| self.store = StoreClient(f"http://localhost:{MCP_STORE_PORT}") | |
| print("MCP Registry: Using HTTP clients (local development mode)") | |
| async def connect(self): | |
| """Connect all clients""" | |
| if not self.use_in_memory: | |
| # Only HTTP clients need connection | |
| await self.search.connect() | |
| await self.email.connect() | |
| await self.calendar.connect() | |
| await self.store.connect() | |
| else: | |
| # In-memory services don't need connection | |
| print("MCP Registry: In-memory services ready (no connection needed)") | |
| async def health_check(self): | |
| """Check health of all MCP servers""" | |
| status = {} | |
| if self.use_in_memory: | |
| # In-memory services are always healthy | |
| status = { | |
| "search": "healthy (in-memory)", | |
| "email": "healthy (in-memory)", | |
| "calendar": "healthy (in-memory)", | |
| "store": "healthy (in-memory)", | |
| "analytics": "healthy (in-memory)", | |
| "enrichment": "healthy (in-memory)", | |
| "validation": "healthy (in-memory)", | |
| "summary": "healthy (in-memory)" | |
| } | |
| else: | |
| # Check HTTP servers | |
| for name, client in [ | |
| ("search", self.search), | |
| ("email", self.email), | |
| ("calendar", self.calendar), | |
| ("store", self.store) | |
| ]: | |
| try: | |
| await client.call("health") | |
| status[name] = "healthy" | |
| except Exception as e: | |
| status[name] = f"unhealthy: {str(e)}" | |
| return status | |
| def get_search_client(self): | |
| return self.search | |
| def get_email_client(self): | |
| return self.email | |
| def get_calendar_client(self): | |
| return self.calendar | |
| def get_store_client(self): | |
| return self.store | |
| def get_analytics_service(self): | |
| """Get analytics service (only available in in-memory mode)""" | |
| return getattr(self, 'analytics', None) | |
| def get_enrichment_service(self): | |
| """Get enrichment service (only available in in-memory mode)""" | |
| return getattr(self, 'enrichment', None) | |
| def get_validation_service(self): | |
| """Get validation service (only available in in-memory mode)""" | |
| return getattr(self, 'validation', None) | |
| def get_summary_service(self): | |
| """Get summary service (only available in in-memory mode)""" | |
| return getattr(self, 'summary', None) | |
| # Global registry instance | |
| _registry_instance = None | |
| def get_mcp_registry(use_in_memory: bool = None) -> MCPRegistry: | |
| """ | |
| Get or create the global MCP registry instance. | |
| Args: | |
| use_in_memory: If True, use in-memory services. If None, use env var. | |
| Returns: | |
| MCPRegistry instance | |
| """ | |
| global _registry_instance | |
| if _registry_instance is None: | |
| _registry_instance = MCPRegistry(use_in_memory=use_in_memory) | |
| return _registry_instance | |