""" In-Memory MCP Client Wrappers These clients wrap the in-memory services to provide the same interface as HTTP clients """ from typing import Dict, List, Optional from mcp.in_memory_services import ( get_in_memory_store, get_in_memory_search, get_in_memory_email, get_in_memory_calendar ) from app.schema import Prospect, Company, Contact, Fact, Thread import logging logger = logging.getLogger(__name__) class InMemoryStoreClient: """In-memory store client (compatible with HTTP client interface)""" def __init__(self): self.service = get_in_memory_store() async def save_prospect(self, prospect) -> str: """Save a prospect""" if isinstance(prospect, dict): return await self.service.save_prospect(prospect) return await self.service.save_prospect(prospect.dict()) async def get_prospect(self, prospect_id: str) -> Optional[Prospect]: """Get a prospect""" data = await self.service.get_prospect(prospect_id) if data: return Prospect(**data) return None async def list_prospects(self) -> List[Prospect]: """List all prospects""" data = await self.service.list_prospects() return [Prospect(**p) for p in data] async def save_company(self, company: Company) -> str: """Save a company""" if isinstance(company, dict): return await self.service.save_company(company) return await self.service.save_company(company.dict()) async def get_company(self, company_id: str) -> Optional[Company]: """Get a company""" data = await self.service.get_company(company_id) if data: return Company(**data) return None async def save_fact(self, fact) -> str: """Save a fact""" if isinstance(fact, dict): return await self.service.save_fact(fact) return await self.service.save_fact(fact.dict()) async def save_contact(self, contact) -> str: """Save a contact""" if isinstance(contact, dict): return await self.service.save_contact(contact) return await self.service.save_contact(contact.dict()) async def list_contacts_by_domain(self, domain: str) -> List[Contact]: """List contacts by domain""" data = await self.service.list_contacts_by_domain(domain) return [Contact(**c) for c in data] async def check_suppression(self, supp_type: str, value: str) -> bool: """Check suppression""" return await self.service.check_suppression(supp_type, value) async def save_handoff(self, packet: Dict) -> str: """Save handoff packet""" return await self.service.save_handoff(packet) async def clear_all(self) -> str: """Clear all data""" return await self.service.clear_all() class InMemorySearchClient: """In-memory search client (compatible with WebSearchService interface)""" def __init__(self): self.service = get_in_memory_search() async def query(self, q: str, max_results: int = 5) -> List[Dict]: """Search query (MCP protocol method)""" return await self.service.query(q, max_results) async def search(self, query: str, max_results: int = 5, **kwargs) -> List[Dict]: """ Search method (compatible with WebSearchService interface) Maps to query() for MCP compatibility """ return await self.query(query, max_results) async def search_news(self, query: str, max_results: int = 5, **kwargs) -> List[Dict]: """ News search method (compatible with WebSearchService interface) Falls back to regular search for now """ return await self.query(query, max_results) class InMemoryEmailClient: """In-memory email client""" def __init__(self): self.service = get_in_memory_email() async def send(self, to: str, subject: str, body: str, prospect_id: str) -> str: """Send email""" return await self.service.send(to, subject, body, prospect_id) async def get_thread(self, prospect_id: str) -> Optional[Thread]: """Get email thread""" data = await self.service.get_thread(prospect_id) if data: return Thread(**data) return None class InMemoryCalendarClient: """In-memory calendar client""" def __init__(self): self.service = get_in_memory_calendar() async def suggest_slots(self) -> List[Dict[str, str]]: """Suggest calendar slots""" return await self.service.suggest_slots() async def generate_ics(self, slot: Dict) -> str: """Generate ICS file""" return await self.service.generate_ics(slot)