|
|
|
|
|
import asyncio |
|
|
import aiohttp |
|
|
import os |
|
|
from typing import Dict, Any |
|
|
from fastapi.encoders import jsonable_encoder |
|
|
from app.config import ( |
|
|
MCP_SEARCH_PORT, MCP_EMAIL_PORT, |
|
|
MCP_CALENDAR_PORT, MCP_STORE_PORT |
|
|
) |
|
|
|
|
|
|
|
|
USE_IN_MEMORY_MODE = os.getenv("USE_IN_MEMORY_MCP", "true").lower() == "true" |
|
|
|
|
|
class MCPClient: |
|
|
"""Base MCP client for server communication""" |
|
|
|
|
|
def __init__(self, base_url: str): |
|
|
self.base_url = base_url |
|
|
self.session = None |
|
|
|
|
|
async def connect(self): |
|
|
"""Initialize connection""" |
|
|
if not self.session: |
|
|
self.session = aiohttp.ClientSession() |
|
|
|
|
|
async def close(self): |
|
|
"""Close connection""" |
|
|
if self.session: |
|
|
await self.session.close() |
|
|
|
|
|
async def call(self, method: str, params: Dict[str, Any] = None): |
|
|
"""Call MCP method""" |
|
|
if not self.session: |
|
|
await self.connect() |
|
|
|
|
|
payload = {"method": method, "params": params or {}} |
|
|
safe_payload = jsonable_encoder(payload) |
|
|
|
|
|
async with self.session.post( |
|
|
f"{self.base_url}/rpc", |
|
|
json=safe_payload |
|
|
) as response: |
|
|
result = await response.json() |
|
|
return result.get("result") |
|
|
|
|
|
class SearchClient(MCPClient): |
|
|
"""Search MCP client""" |
|
|
|
|
|
async def query(self, q: str): |
|
|
return await self.call("search.query", {"q": q}) |
|
|
|
|
|
class EmailClient(MCPClient): |
|
|
"""Email MCP client""" |
|
|
|
|
|
async def send(self, to: str, subject: str, body: str): |
|
|
return await self.call("email.send", { |
|
|
"to": to, "subject": subject, "body": body |
|
|
}) |
|
|
|
|
|
async def get_thread(self, prospect_id: str): |
|
|
return await self.call("email.thread", {"prospect_id": prospect_id}) |
|
|
|
|
|
class CalendarClient(MCPClient): |
|
|
"""Calendar MCP client""" |
|
|
|
|
|
async def suggest_slots(self): |
|
|
return await self.call("calendar.suggest_slots") |
|
|
|
|
|
async def generate_ics(self, summary: str, start_iso: str, end_iso: str): |
|
|
return await self.call("calendar.generate_ics", { |
|
|
"summary": summary, |
|
|
"start_iso": start_iso, |
|
|
"end_iso": end_iso |
|
|
}) |
|
|
|
|
|
class StoreClient(MCPClient): |
|
|
"""Store MCP client""" |
|
|
|
|
|
async def save_prospect(self, prospect): |
|
|
return await self.call("store.save_prospect", {"prospect": prospect.dict()}) |
|
|
|
|
|
async def get_prospect(self, prospect_id: str): |
|
|
result = await self.call("store.get_prospect", {"id": prospect_id}) |
|
|
if result: |
|
|
from app.schema import Prospect |
|
|
return Prospect(**result) |
|
|
|
|
|
async def list_prospects(self): |
|
|
results = await self.call("store.list_prospects") |
|
|
from app.schema import Prospect |
|
|
return [Prospect(**p) for p in results] |
|
|
|
|
|
async def save_company(self, company): |
|
|
return await self.call("store.save_company", {"company": company}) |
|
|
|
|
|
async def get_company(self, company_id: str): |
|
|
result = await self.call("store.get_company", {"id": company_id}) |
|
|
if result: |
|
|
from app.schema import Company |
|
|
return Company(**result) |
|
|
|
|
|
async def save_fact(self, fact): |
|
|
return await self.call("store.save_fact", {"fact": fact.dict()}) |
|
|
|
|
|
async def save_contact(self, contact): |
|
|
return await self.call("store.save_contact", {"contact": contact.dict()}) |
|
|
|
|
|
async def list_contacts_by_domain(self, domain: str): |
|
|
results = await self.call("store.list_contacts_by_domain", {"domain": domain}) |
|
|
from app.schema import Contact |
|
|
return [Contact(**c) for c in results] |
|
|
|
|
|
async def check_suppression(self, type: str, value: str): |
|
|
return await self.call("store.check_suppression", {"type": type, "value": value}) |
|
|
|
|
|
async def save_handoff(self, packet): |
|
|
return await self.call("store.save_handoff", {"packet": packet.dict()}) |
|
|
|
|
|
async def clear_all(self): |
|
|
return await self.call("store.clear_all") |
|
|
|
|
|
class MCPRegistry: |
|
|
""" |
|
|
Central registry for all MCP clients |
|
|
|
|
|
Supports two modes: |
|
|
- HTTP mode: Connects to separate MCP server processes (local development) |
|
|
- In-memory mode: Uses in-memory services (HF Spaces deployment) |
|
|
""" |
|
|
|
|
|
def __init__(self, use_in_memory: bool = None): |
|
|
self.use_in_memory = use_in_memory if use_in_memory is not None else USE_IN_MEMORY_MODE |
|
|
|
|
|
if self.use_in_memory: |
|
|
|
|
|
from mcp.in_memory_clients import ( |
|
|
InMemorySearchClient, |
|
|
InMemoryEmailClient, |
|
|
InMemoryCalendarClient, |
|
|
InMemoryStoreClient |
|
|
) |
|
|
from mcp.productivity_services import ( |
|
|
get_analytics_service, |
|
|
get_enrichment_service, |
|
|
get_validation_service, |
|
|
get_summary_service |
|
|
) |
|
|
self.search = InMemorySearchClient() |
|
|
self.email = InMemoryEmailClient() |
|
|
self.calendar = InMemoryCalendarClient() |
|
|
self.store = InMemoryStoreClient() |
|
|
|
|
|
self.analytics = get_analytics_service() |
|
|
self.enrichment = get_enrichment_service() |
|
|
self.validation = get_validation_service() |
|
|
self.summary = get_summary_service() |
|
|
print("MCP Registry: Using in-memory services (HF Spaces mode)") |
|
|
print("MCP Registry: Loaded productivity services (Analytics, Enrichment, Validation, Summary)") |
|
|
else: |
|
|
|
|
|
self.search = SearchClient(f"http://localhost:{MCP_SEARCH_PORT}") |
|
|
self.email = EmailClient(f"http://localhost:{MCP_EMAIL_PORT}") |
|
|
self.calendar = CalendarClient(f"http://localhost:{MCP_CALENDAR_PORT}") |
|
|
self.store = StoreClient(f"http://localhost:{MCP_STORE_PORT}") |
|
|
print("MCP Registry: Using HTTP clients (local development mode)") |
|
|
|
|
|
async def connect(self): |
|
|
"""Connect all clients""" |
|
|
if not self.use_in_memory: |
|
|
|
|
|
await self.search.connect() |
|
|
await self.email.connect() |
|
|
await self.calendar.connect() |
|
|
await self.store.connect() |
|
|
else: |
|
|
|
|
|
print("MCP Registry: In-memory services ready (no connection needed)") |
|
|
|
|
|
async def health_check(self): |
|
|
"""Check health of all MCP servers""" |
|
|
status = {} |
|
|
|
|
|
if self.use_in_memory: |
|
|
|
|
|
status = { |
|
|
"search": "healthy (in-memory)", |
|
|
"email": "healthy (in-memory)", |
|
|
"calendar": "healthy (in-memory)", |
|
|
"store": "healthy (in-memory)", |
|
|
"analytics": "healthy (in-memory)", |
|
|
"enrichment": "healthy (in-memory)", |
|
|
"validation": "healthy (in-memory)", |
|
|
"summary": "healthy (in-memory)" |
|
|
} |
|
|
else: |
|
|
|
|
|
for name, client in [ |
|
|
("search", self.search), |
|
|
("email", self.email), |
|
|
("calendar", self.calendar), |
|
|
("store", self.store) |
|
|
]: |
|
|
try: |
|
|
await client.call("health") |
|
|
status[name] = "healthy" |
|
|
except Exception as e: |
|
|
status[name] = f"unhealthy: {str(e)}" |
|
|
|
|
|
return status |
|
|
|
|
|
def get_search_client(self): |
|
|
return self.search |
|
|
|
|
|
def get_email_client(self): |
|
|
return self.email |
|
|
|
|
|
def get_calendar_client(self): |
|
|
return self.calendar |
|
|
|
|
|
def get_store_client(self): |
|
|
return self.store |
|
|
|
|
|
def get_analytics_service(self): |
|
|
"""Get analytics service (only available in in-memory mode)""" |
|
|
return getattr(self, 'analytics', None) |
|
|
|
|
|
def get_enrichment_service(self): |
|
|
"""Get enrichment service (only available in in-memory mode)""" |
|
|
return getattr(self, 'enrichment', None) |
|
|
|
|
|
def get_validation_service(self): |
|
|
"""Get validation service (only available in in-memory mode)""" |
|
|
return getattr(self, 'validation', None) |
|
|
|
|
|
def get_summary_service(self): |
|
|
"""Get summary service (only available in in-memory mode)""" |
|
|
return getattr(self, 'summary', None) |
|
|
|
|
|
|
|
|
|
|
|
_registry_instance = None |
|
|
|
|
|
def get_mcp_registry(use_in_memory: bool = None) -> MCPRegistry: |
|
|
""" |
|
|
Get or create the global MCP registry instance. |
|
|
|
|
|
Args: |
|
|
use_in_memory: If True, use in-memory services. If None, use env var. |
|
|
|
|
|
Returns: |
|
|
MCPRegistry instance |
|
|
""" |
|
|
global _registry_instance |
|
|
|
|
|
if _registry_instance is None: |
|
|
_registry_instance = MCPRegistry(use_in_memory=use_in_memory) |
|
|
|
|
|
return _registry_instance |
|
|
|