File size: 4,756 Bytes
8bab08d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
"""
In-Memory MCP Client Wrappers
These clients wrap the in-memory services to provide the same interface as HTTP clients
"""
from typing import Dict, List, Optional
from mcp.in_memory_services import (
get_in_memory_store,
get_in_memory_search,
get_in_memory_email,
get_in_memory_calendar
)
from app.schema import Prospect, Company, Contact, Fact, Thread
import logging
logger = logging.getLogger(__name__)
class InMemoryStoreClient:
"""In-memory store client (compatible with HTTP client interface)"""
def __init__(self):
self.service = get_in_memory_store()
async def save_prospect(self, prospect) -> str:
"""Save a prospect"""
if isinstance(prospect, dict):
return await self.service.save_prospect(prospect)
return await self.service.save_prospect(prospect.dict())
async def get_prospect(self, prospect_id: str) -> Optional[Prospect]:
"""Get a prospect"""
data = await self.service.get_prospect(prospect_id)
if data:
return Prospect(**data)
return None
async def list_prospects(self) -> List[Prospect]:
"""List all prospects"""
data = await self.service.list_prospects()
return [Prospect(**p) for p in data]
async def save_company(self, company: Company) -> str:
"""Save a company"""
if isinstance(company, dict):
return await self.service.save_company(company)
return await self.service.save_company(company.dict())
async def get_company(self, company_id: str) -> Optional[Company]:
"""Get a company"""
data = await self.service.get_company(company_id)
if data:
return Company(**data)
return None
async def save_fact(self, fact) -> str:
"""Save a fact"""
if isinstance(fact, dict):
return await self.service.save_fact(fact)
return await self.service.save_fact(fact.dict())
async def save_contact(self, contact) -> str:
"""Save a contact"""
if isinstance(contact, dict):
return await self.service.save_contact(contact)
return await self.service.save_contact(contact.dict())
async def list_contacts_by_domain(self, domain: str) -> List[Contact]:
"""List contacts by domain"""
data = await self.service.list_contacts_by_domain(domain)
return [Contact(**c) for c in data]
async def check_suppression(self, supp_type: str, value: str) -> bool:
"""Check suppression"""
return await self.service.check_suppression(supp_type, value)
async def save_handoff(self, packet: Dict) -> str:
"""Save handoff packet"""
return await self.service.save_handoff(packet)
async def clear_all(self) -> str:
"""Clear all data"""
return await self.service.clear_all()
class InMemorySearchClient:
"""In-memory search client (compatible with WebSearchService interface)"""
def __init__(self):
self.service = get_in_memory_search()
async def query(self, q: str, max_results: int = 5) -> List[Dict]:
"""Search query (MCP protocol method)"""
return await self.service.query(q, max_results)
async def search(self, query: str, max_results: int = 5, **kwargs) -> List[Dict]:
"""
Search method (compatible with WebSearchService interface)
Maps to query() for MCP compatibility
"""
return await self.query(query, max_results)
async def search_news(self, query: str, max_results: int = 5, **kwargs) -> List[Dict]:
"""
News search method (compatible with WebSearchService interface)
Falls back to regular search for now
"""
return await self.query(query, max_results)
class InMemoryEmailClient:
"""In-memory email client"""
def __init__(self):
self.service = get_in_memory_email()
async def send(self, to: str, subject: str, body: str, prospect_id: str) -> str:
"""Send email"""
return await self.service.send(to, subject, body, prospect_id)
async def get_thread(self, prospect_id: str) -> Optional[Thread]:
"""Get email thread"""
data = await self.service.get_thread(prospect_id)
if data:
return Thread(**data)
return None
class InMemoryCalendarClient:
"""In-memory calendar client"""
def __init__(self):
self.service = get_in_memory_calendar()
async def suggest_slots(self) -> List[Dict[str, str]]:
"""Suggest calendar slots"""
return await self.service.suggest_slots()
async def generate_ics(self, slot: Dict) -> str:
"""Generate ICS file"""
return await self.service.generate_ics(slot)
|