cx_ai_agent_v1 / mcp /in_memory_services.py
muzakkirhussain011's picture
Add application files (text files only)
8bab08d
"""
In-Memory MCP Services for Hugging Face Spaces
These services run in-memory without requiring separate server processes
"""
import json
import asyncio
from typing import Dict, List, Optional, Any
from pathlib import Path
from datetime import datetime
from services.web_search import get_search_service
import logging
logger = logging.getLogger(__name__)
class InMemoryStoreService:
"""In-memory store service (replaces store_server.py for HF Spaces)"""
def __init__(self, data_dir: Optional[Path] = None):
self.data_dir = data_dir or Path(__file__).parent.parent / "data"
self.data_dir.mkdir(exist_ok=True)
# In-memory storage
self.prospects = []
self.companies = []
self.facts = []
self.contacts = []
self.handoffs = []
self.suppressions = []
# Lock for thread safety
self.lock = asyncio.Lock()
# Load initial data
self._load_data()
def _load_data(self):
"""Load data from JSON files"""
try:
# Load prospects
prospects_file = self.data_dir / "prospects.json"
if prospects_file.exists():
with open(prospects_file) as f:
content = json.load(f)
self.prospects = content if content else []
# Load companies
companies_file = self.data_dir / "companies_store.json"
if companies_file.exists():
with open(companies_file) as f:
content = json.load(f)
self.companies = content if content else []
# Load facts
facts_file = self.data_dir / "facts.json"
if facts_file.exists():
with open(facts_file) as f:
content = json.load(f)
self.facts = content if content else []
# Load contacts
contacts_file = self.data_dir / "contacts.json"
if contacts_file.exists():
with open(contacts_file) as f:
content = json.load(f)
self.contacts = content if content else []
# Load handoffs
handoffs_file = self.data_dir / "handoffs.json"
if handoffs_file.exists():
with open(handoffs_file) as f:
content = json.load(f)
self.handoffs = content if content else []
# Load suppressions
supp_file = self.data_dir / "suppression.json"
if supp_file.exists():
with open(supp_file) as f:
content = json.load(f)
self.suppressions = content if content else []
logger.info("In-memory store loaded successfully")
except Exception as e:
logger.error(f"Error loading store data: {e}")
async def save_prospect(self, prospect: Dict) -> str:
"""Save or update a prospect (prevents duplicates by domain)"""
async with self.lock:
# Check for duplicate by ID first
found = False
for i, p in enumerate(self.prospects):
if p["id"] == prospect["id"]:
self.prospects[i] = prospect
found = True
break
# If not found by ID, check for duplicate by domain
if not found:
company = prospect.get("company", {})
domain = company.get("domain", "")
if domain:
for existing in self.prospects:
existing_domain = existing.get("company", {}).get("domain", "")
if existing_domain and existing_domain.lower() == domain.lower():
logger.warning(f"Duplicate prospect detected for domain: {domain}. Updating existing prospect.")
# Update the existing prospect instead of creating duplicate
for i, p in enumerate(self.prospects):
if p.get("company", {}).get("domain", "").lower() == domain.lower():
self.prospects[i] = prospect
found = True
break
break
if not found:
self.prospects.append(prospect)
return "saved"
async def get_prospect(self, prospect_id: str) -> Optional[Dict]:
"""Get a prospect by ID"""
for p in self.prospects:
if p["id"] == prospect_id:
return p
return None
async def list_prospects(self) -> List[Dict]:
"""List all prospects"""
return self.prospects
async def save_company(self, company: Dict) -> str:
"""Save or update a company"""
async with self.lock:
found = False
for i, c in enumerate(self.companies):
if c["id"] == company["id"]:
self.companies[i] = company
found = True
break
if not found:
self.companies.append(company)
return "saved"
async def get_company(self, company_id: str) -> Optional[Dict]:
"""Get a company by ID"""
# Check in-memory
for c in self.companies:
if c["id"] == company_id:
return c
# Check seed file
seed_file = self.data_dir / "companies.json"
if seed_file.exists():
with open(seed_file) as f:
seeds = json.load(f)
for c in seeds:
if c["id"] == company_id:
return c
return None
async def save_fact(self, fact: Dict) -> str:
"""Save a fact"""
async with self.lock:
existing_ids = {f.get("id") for f in self.facts if f.get("id")}
if fact.get("id") not in existing_ids:
self.facts.append(fact)
return "saved"
async def save_contact(self, contact: Dict) -> str:
"""Save a contact (prevents duplicates by email)"""
async with self.lock:
# Check for duplicate by ID first
found = False
for i, c in enumerate(self.contacts):
if c.get("id") == contact.get("id"):
self.contacts[i] = contact
found = True
break
# If not found by ID, check for duplicate by email
if not found:
email = contact.get("email", "").lower()
if email:
for existing in self.contacts:
existing_email = existing.get("email", "").lower()
if existing_email and existing_email == email:
logger.warning(f"Duplicate contact detected for email: {email}. Skipping.")
# Don't add duplicate, return existing
return "duplicate_skipped"
# Not a duplicate, add it
self.contacts.append(contact)
return "saved"
async def list_contacts_by_domain(self, domain: str) -> List[Dict]:
"""List contacts by domain"""
results = []
for c in self.contacts:
if isinstance(c, dict) and "email" in c:
email = c["email"]
if email.endswith(f"@{domain}"):
results.append(c)
return results
async def check_suppression(self, supp_type: str, value: str) -> bool:
"""Check if an email/domain is suppressed"""
for supp in self.suppressions:
if isinstance(supp, dict):
if supp.get("type") == supp_type and supp.get("value") == value:
# Check expiry
if supp.get("expires_at"):
try:
expires = datetime.fromisoformat(supp["expires_at"].replace("Z", "+00:00"))
if expires < datetime.utcnow():
continue
except:
pass
return True
return False
async def save_handoff(self, packet: Dict) -> str:
"""Save a handoff packet"""
async with self.lock:
self.handoffs.append(packet)
return "saved"
async def clear_all(self) -> str:
"""Clear all data"""
async with self.lock:
self.prospects = []
self.companies = []
self.facts = []
self.contacts = []
self.handoffs = []
return "cleared"
class InMemorySearchService:
"""In-memory search service using web search"""
def __init__(self):
self.search = get_search_service()
logger.info("In-memory search service initialized")
async def query(self, q: str, max_results: int = 5) -> List[Dict]:
"""Perform search query"""
if not q:
return []
logger.info(f"In-memory search query: '{q}'")
# Perform real web search
search_results = await self.search.search(q, max_results=max_results)
# Format results for MCP protocol (with backward compatibility)
results = []
for result in search_results:
body_text = result.get('body', '')
results.append({
"text": body_text, # MCP protocol format
"body": body_text, # Backward compatibility with WebSearchService
"title": result.get('title', ''),
"source": result.get('source', ''),
"url": result.get('url', ''),
"ts": datetime.utcnow().isoformat(),
"confidence": 0.8
})
logger.info(f"Returning {len(results)} search results")
return results
class InMemoryEmailService:
"""In-memory email service (mock for Gradio demo)"""
def __init__(self):
self.threads = {}
self.messages = []
logger.info("In-memory email service initialized")
async def send(self, to: str, subject: str, body: str, prospect_id: str) -> str:
"""Send an email (simulated)"""
thread_id = f"thread_{prospect_id}_{datetime.utcnow().timestamp()}"
# Create thread
self.threads[prospect_id] = {
"id": thread_id,
"prospect_id": prospect_id,
"messages": []
}
# Create message
message = {
"id": f"msg_{len(self.messages)}",
"thread_id": thread_id,
"prospect_id": prospect_id,
"direction": "outbound",
"subject": subject,
"body": body,
"sent_at": datetime.utcnow().isoformat()
}
self.threads[prospect_id]["messages"].append(message)
self.messages.append(message)
logger.info(f"Simulated email sent to {to}")
return thread_id
async def get_thread(self, prospect_id: str) -> Optional[Dict]:
"""Get email thread for a prospect"""
return self.threads.get(prospect_id)
class InMemoryCalendarService:
"""In-memory calendar service (mock for Gradio demo)"""
def __init__(self):
logger.info("In-memory calendar service initialized")
async def suggest_slots(self) -> List[Dict[str, str]]:
"""Suggest calendar slots"""
# Return mock slots with correct key names
from datetime import datetime, timedelta
base_date = datetime.now() + timedelta(days=1)
return [
{
"start_iso": (base_date.replace(hour=10, minute=0, second=0)).isoformat(),
"end_iso": (base_date.replace(hour=11, minute=0, second=0)).isoformat(),
"title": "Initial consultation"
},
{
"start_iso": (base_date + timedelta(days=1)).replace(hour=14, minute=0, second=0).isoformat(),
"end_iso": (base_date + timedelta(days=1)).replace(hour=15, minute=0, second=0).isoformat(),
"title": "Product demo"
},
{
"start_iso": (base_date + timedelta(days=2)).replace(hour=9, minute=0, second=0).isoformat(),
"end_iso": (base_date + timedelta(days=2)).replace(hour=10, minute=0, second=0).isoformat(),
"title": "Follow-up discussion"
}
]
async def generate_ics(self, slot: Dict) -> str:
"""Generate ICS calendar file"""
# Mock ICS generation
return f"BEGIN:VCALENDAR\nVERSION:2.0\nEND:VCALENDAR"
# Singleton instances
_store_service: Optional[InMemoryStoreService] = None
_search_service: Optional[InMemorySearchService] = None
_email_service: Optional[InMemoryEmailService] = None
_calendar_service: Optional[InMemoryCalendarService] = None
def get_in_memory_store() -> InMemoryStoreService:
"""Get or create in-memory store service"""
global _store_service
if _store_service is None:
_store_service = InMemoryStoreService()
return _store_service
def get_in_memory_search() -> InMemorySearchService:
"""Get or create in-memory search service"""
global _search_service
if _search_service is None:
_search_service = InMemorySearchService()
return _search_service
def get_in_memory_email() -> InMemoryEmailService:
"""Get or create in-memory email service"""
global _email_service
if _email_service is None:
_email_service = InMemoryEmailService()
return _email_service
def get_in_memory_calendar() -> InMemoryCalendarService:
"""Get or create in-memory calendar service"""
global _calendar_service
if _calendar_service is None:
_calendar_service = InMemoryCalendarService()
return _calendar_service