""" In-Memory MCP Services for Hugging Face Spaces These services run in-memory without requiring separate server processes """ import json import asyncio from typing import Dict, List, Optional, Any from pathlib import Path from datetime import datetime from services.web_search import get_search_service import logging logger = logging.getLogger(__name__) class InMemoryStoreService: """In-memory store service (replaces store_server.py for HF Spaces)""" def __init__(self, data_dir: Optional[Path] = None): self.data_dir = data_dir or Path(__file__).parent.parent / "data" self.data_dir.mkdir(exist_ok=True) # In-memory storage self.prospects = [] self.companies = [] self.facts = [] self.contacts = [] self.handoffs = [] self.suppressions = [] # Lock for thread safety self.lock = asyncio.Lock() # Load initial data self._load_data() def _load_data(self): """Load data from JSON files""" try: # Load prospects prospects_file = self.data_dir / "prospects.json" if prospects_file.exists(): with open(prospects_file) as f: content = json.load(f) self.prospects = content if content else [] # Load companies companies_file = self.data_dir / "companies_store.json" if companies_file.exists(): with open(companies_file) as f: content = json.load(f) self.companies = content if content else [] # Load facts facts_file = self.data_dir / "facts.json" if facts_file.exists(): with open(facts_file) as f: content = json.load(f) self.facts = content if content else [] # Load contacts contacts_file = self.data_dir / "contacts.json" if contacts_file.exists(): with open(contacts_file) as f: content = json.load(f) self.contacts = content if content else [] # Load handoffs handoffs_file = self.data_dir / "handoffs.json" if handoffs_file.exists(): with open(handoffs_file) as f: content = json.load(f) self.handoffs = content if content else [] # Load suppressions supp_file = self.data_dir / "suppression.json" if supp_file.exists(): with open(supp_file) as f: content = json.load(f) self.suppressions = content if content else [] logger.info("In-memory store loaded successfully") except Exception as e: logger.error(f"Error loading store data: {e}") async def save_prospect(self, prospect: Dict) -> str: """Save or update a prospect (prevents duplicates by domain)""" async with self.lock: # Check for duplicate by ID first found = False for i, p in enumerate(self.prospects): if p["id"] == prospect["id"]: self.prospects[i] = prospect found = True break # If not found by ID, check for duplicate by domain if not found: company = prospect.get("company", {}) domain = company.get("domain", "") if domain: for existing in self.prospects: existing_domain = existing.get("company", {}).get("domain", "") if existing_domain and existing_domain.lower() == domain.lower(): logger.warning(f"Duplicate prospect detected for domain: {domain}. Updating existing prospect.") # Update the existing prospect instead of creating duplicate for i, p in enumerate(self.prospects): if p.get("company", {}).get("domain", "").lower() == domain.lower(): self.prospects[i] = prospect found = True break break if not found: self.prospects.append(prospect) return "saved" async def get_prospect(self, prospect_id: str) -> Optional[Dict]: """Get a prospect by ID""" for p in self.prospects: if p["id"] == prospect_id: return p return None async def list_prospects(self) -> List[Dict]: """List all prospects""" return self.prospects async def save_company(self, company: Dict) -> str: """Save or update a company""" async with self.lock: found = False for i, c in enumerate(self.companies): if c["id"] == company["id"]: self.companies[i] = company found = True break if not found: self.companies.append(company) return "saved" async def get_company(self, company_id: str) -> Optional[Dict]: """Get a company by ID""" # Check in-memory for c in self.companies: if c["id"] == company_id: return c # Check seed file seed_file = self.data_dir / "companies.json" if seed_file.exists(): with open(seed_file) as f: seeds = json.load(f) for c in seeds: if c["id"] == company_id: return c return None async def save_fact(self, fact: Dict) -> str: """Save a fact""" async with self.lock: existing_ids = {f.get("id") for f in self.facts if f.get("id")} if fact.get("id") not in existing_ids: self.facts.append(fact) return "saved" async def save_contact(self, contact: Dict) -> str: """Save a contact (prevents duplicates by email)""" async with self.lock: # Check for duplicate by ID first found = False for i, c in enumerate(self.contacts): if c.get("id") == contact.get("id"): self.contacts[i] = contact found = True break # If not found by ID, check for duplicate by email if not found: email = contact.get("email", "").lower() if email: for existing in self.contacts: existing_email = existing.get("email", "").lower() if existing_email and existing_email == email: logger.warning(f"Duplicate contact detected for email: {email}. Skipping.") # Don't add duplicate, return existing return "duplicate_skipped" # Not a duplicate, add it self.contacts.append(contact) return "saved" async def list_contacts_by_domain(self, domain: str) -> List[Dict]: """List contacts by domain""" results = [] for c in self.contacts: if isinstance(c, dict) and "email" in c: email = c["email"] if email.endswith(f"@{domain}"): results.append(c) return results async def check_suppression(self, supp_type: str, value: str) -> bool: """Check if an email/domain is suppressed""" for supp in self.suppressions: if isinstance(supp, dict): if supp.get("type") == supp_type and supp.get("value") == value: # Check expiry if supp.get("expires_at"): try: expires = datetime.fromisoformat(supp["expires_at"].replace("Z", "+00:00")) if expires < datetime.utcnow(): continue except: pass return True return False async def save_handoff(self, packet: Dict) -> str: """Save a handoff packet""" async with self.lock: self.handoffs.append(packet) return "saved" async def clear_all(self) -> str: """Clear all data""" async with self.lock: self.prospects = [] self.companies = [] self.facts = [] self.contacts = [] self.handoffs = [] return "cleared" class InMemorySearchService: """In-memory search service using web search""" def __init__(self): self.search = get_search_service() logger.info("In-memory search service initialized") async def query(self, q: str, max_results: int = 5) -> List[Dict]: """Perform search query""" if not q: return [] logger.info(f"In-memory search query: '{q}'") # Perform real web search search_results = await self.search.search(q, max_results=max_results) # Format results for MCP protocol (with backward compatibility) results = [] for result in search_results: body_text = result.get('body', '') results.append({ "text": body_text, # MCP protocol format "body": body_text, # Backward compatibility with WebSearchService "title": result.get('title', ''), "source": result.get('source', ''), "url": result.get('url', ''), "ts": datetime.utcnow().isoformat(), "confidence": 0.8 }) logger.info(f"Returning {len(results)} search results") return results class InMemoryEmailService: """In-memory email service (mock for Gradio demo)""" def __init__(self): self.threads = {} self.messages = [] logger.info("In-memory email service initialized") async def send(self, to: str, subject: str, body: str, prospect_id: str) -> str: """Send an email (simulated)""" thread_id = f"thread_{prospect_id}_{datetime.utcnow().timestamp()}" # Create thread self.threads[prospect_id] = { "id": thread_id, "prospect_id": prospect_id, "messages": [] } # Create message message = { "id": f"msg_{len(self.messages)}", "thread_id": thread_id, "prospect_id": prospect_id, "direction": "outbound", "subject": subject, "body": body, "sent_at": datetime.utcnow().isoformat() } self.threads[prospect_id]["messages"].append(message) self.messages.append(message) logger.info(f"Simulated email sent to {to}") return thread_id async def get_thread(self, prospect_id: str) -> Optional[Dict]: """Get email thread for a prospect""" return self.threads.get(prospect_id) class InMemoryCalendarService: """In-memory calendar service (mock for Gradio demo)""" def __init__(self): logger.info("In-memory calendar service initialized") async def suggest_slots(self) -> List[Dict[str, str]]: """Suggest calendar slots""" # Return mock slots with correct key names from datetime import datetime, timedelta base_date = datetime.now() + timedelta(days=1) return [ { "start_iso": (base_date.replace(hour=10, minute=0, second=0)).isoformat(), "end_iso": (base_date.replace(hour=11, minute=0, second=0)).isoformat(), "title": "Initial consultation" }, { "start_iso": (base_date + timedelta(days=1)).replace(hour=14, minute=0, second=0).isoformat(), "end_iso": (base_date + timedelta(days=1)).replace(hour=15, minute=0, second=0).isoformat(), "title": "Product demo" }, { "start_iso": (base_date + timedelta(days=2)).replace(hour=9, minute=0, second=0).isoformat(), "end_iso": (base_date + timedelta(days=2)).replace(hour=10, minute=0, second=0).isoformat(), "title": "Follow-up discussion" } ] async def generate_ics(self, slot: Dict) -> str: """Generate ICS calendar file""" # Mock ICS generation return f"BEGIN:VCALENDAR\nVERSION:2.0\nEND:VCALENDAR" # Singleton instances _store_service: Optional[InMemoryStoreService] = None _search_service: Optional[InMemorySearchService] = None _email_service: Optional[InMemoryEmailService] = None _calendar_service: Optional[InMemoryCalendarService] = None def get_in_memory_store() -> InMemoryStoreService: """Get or create in-memory store service""" global _store_service if _store_service is None: _store_service = InMemoryStoreService() return _store_service def get_in_memory_search() -> InMemorySearchService: """Get or create in-memory search service""" global _search_service if _search_service is None: _search_service = InMemorySearchService() return _search_service def get_in_memory_email() -> InMemoryEmailService: """Get or create in-memory email service""" global _email_service if _email_service is None: _email_service = InMemoryEmailService() return _email_service def get_in_memory_calendar() -> InMemoryCalendarService: """Get or create in-memory calendar service""" global _calendar_service if _calendar_service is None: _calendar_service = InMemoryCalendarService() return _calendar_service