cx_ai_agent_v1 / mcp /database /store_service.py
muzakkirhussain011's picture
Add application files (text files only)
8bab08d
"""
Database-Backed Store Service for MCP Server
Replaces JSON file storage with enterprise-grade SQL database
"""
import uuid
import logging
from typing import Dict, List, Optional, Any
from datetime import datetime
from .engine import get_db_manager
from .repositories import (
CompanyRepository,
ProspectRepository,
ContactRepository,
FactRepository,
ActivityRepository,
SuppressionRepository,
HandoffRepository
)
from .models import Company, Prospect, Contact, Fact, Suppression, Handoff
logger = logging.getLogger(__name__)
class DatabaseStoreService:
"""
Database-backed store service with enterprise features:
- SQL database with ACID guarantees
- Connection pooling
- Tenant isolation
- Audit logging
- Transaction management
"""
def __init__(self, tenant_id: Optional[str] = None):
self.db_manager = get_db_manager()
self.tenant_id = tenant_id
logger.info(f"Database store service initialized (tenant: {tenant_id or 'default'})")
async def save_prospect(self, prospect: Dict) -> str:
"""Save or update a prospect"""
async with self.db_manager.get_session() as session:
repo = ProspectRepository(session, self.tenant_id)
# Check if exists
existing = await repo.get_by_id(prospect["id"])
if existing:
# Update existing
await repo.update(prospect["id"], prospect)
logger.debug(f"Updated prospect: {prospect['id']}")
else:
# Create new
await repo.create(prospect)
logger.debug(f"Created prospect: {prospect['id']}")
return "saved"
async def get_prospect(self, prospect_id: str) -> Optional[Dict]:
"""Get a prospect by ID"""
async with self.db_manager.get_session() as session:
repo = ProspectRepository(session, self.tenant_id)
prospect = await repo.get_by_id(prospect_id, load_relationships=True)
if prospect:
return self._prospect_to_dict(prospect)
return None
async def list_prospects(self) -> List[Dict]:
"""List all prospects"""
async with self.db_manager.get_session() as session:
repo = ProspectRepository(session, self.tenant_id)
prospects = await repo.list(limit=1000)
return [self._prospect_to_dict(p) for p in prospects]
async def save_company(self, company: Dict) -> str:
"""Save or update a company"""
async with self.db_manager.get_session() as session:
repo = CompanyRepository(session, self.tenant_id)
# Check if exists
existing = await repo.get_by_id(company["id"])
if existing:
# Update existing
await repo.update(company["id"], company)
logger.debug(f"Updated company: {company['id']}")
else:
# Create new
await repo.create(company)
logger.debug(f"Created company: {company['id']}")
return "saved"
async def get_company(self, company_id: str) -> Optional[Dict]:
"""Get a company by ID"""
async with self.db_manager.get_session() as session:
repo = CompanyRepository(session, self.tenant_id)
company = await repo.get_by_id(company_id)
if company:
return self._company_to_dict(company)
return None
async def save_fact(self, fact: Dict) -> str:
"""Save a fact"""
async with self.db_manager.get_session() as session:
repo = FactRepository(session, self.tenant_id)
# Check if exists by ID
try:
query = session.query(Fact).filter(Fact.id == fact["id"])
if self.tenant_id:
query = query.filter(Fact.tenant_id == self.tenant_id)
existing = await session.execute(query)
if existing.scalar_one_or_none():
logger.debug(f"Fact already exists: {fact['id']}")
return "saved"
except:
pass
# Create new fact
await repo.create(fact)
logger.debug(f"Created fact: {fact['id']}")
return "saved"
async def save_contact(self, contact: Dict) -> str:
"""Save a contact"""
async with self.db_manager.get_session() as session:
repo = ContactRepository(session, self.tenant_id)
# Check if exists by email
email = contact.get("email", "").lower()
if email:
existing = await repo.get_by_email(email)
if existing:
logger.warning(f"Contact already exists: {email}")
return "duplicate_skipped"
# Check if exists by ID
if "id" in contact:
existing = await repo.get_by_id(contact["id"])
if existing:
logger.debug(f"Updating contact: {contact['id']}")
# Update logic here if needed
return "saved"
# Create new contact
await repo.create(contact)
logger.debug(f"Created contact: {contact['id']}")
return "saved"
async def list_contacts_by_domain(self, domain: str) -> List[Dict]:
"""List contacts by domain"""
async with self.db_manager.get_session() as session:
repo = ContactRepository(session, self.tenant_id)
contacts = await repo.list_by_domain(domain)
return [self._contact_to_dict(c) for c in contacts]
async def check_suppression(self, supp_type: str, value: str) -> bool:
"""Check if an email/domain is suppressed"""
async with self.db_manager.get_session() as session:
repo = SuppressionRepository(session, self.tenant_id)
is_suppressed = await repo.check(supp_type, value)
return is_suppressed
async def save_handoff(self, packet: Dict) -> str:
"""Save a handoff packet"""
async with self.db_manager.get_session() as session:
repo = HandoffRepository(session, self.tenant_id)
# Generate ID if not present
if "id" not in packet:
packet["id"] = str(uuid.uuid4())
await repo.create(packet)
logger.debug(f"Created handoff: {packet['id']}")
return "saved"
async def clear_all(self) -> str:
"""Clear all data (use with caution!)"""
logger.warning(f"Clearing all data for tenant: {self.tenant_id or 'default'}")
async with self.db_manager.get_session() as session:
# Delete in order to respect foreign keys
await session.execute(
"DELETE FROM activities WHERE tenant_id = :tenant",
{"tenant": self.tenant_id or ""}
)
await session.execute(
"DELETE FROM handoffs WHERE tenant_id = :tenant",
{"tenant": self.tenant_id or ""}
)
await session.execute(
"DELETE FROM facts WHERE tenant_id = :tenant",
{"tenant": self.tenant_id or ""}
)
await session.execute(
"DELETE FROM contacts WHERE tenant_id = :tenant",
{"tenant": self.tenant_id or ""}
)
await session.execute(
"DELETE FROM prospects WHERE tenant_id = :tenant",
{"tenant": self.tenant_id or ""}
)
await session.execute(
"DELETE FROM companies WHERE tenant_id = :tenant",
{"tenant": self.tenant_id or ""}
)
await session.commit()
logger.info("All data cleared")
return "cleared"
def _company_to_dict(self, company: Company) -> Dict:
"""Convert Company model to dictionary"""
return {
"id": company.id,
"name": company.name,
"domain": company.domain,
"description": company.description,
"industry": company.industry,
"employee_count": company.employee_count,
"founded_year": company.founded_year,
"revenue_range": company.revenue_range,
"funding": company.funding,
"headquarters_city": company.headquarters_city,
"headquarters_state": company.headquarters_state,
"headquarters_country": company.headquarters_country,
"tech_stack": company.tech_stack or {},
"social_profiles": company.social_profiles or {},
"metadata": company.metadata or {},
"is_active": company.is_active,
"created_at": company.created_at.isoformat() if company.created_at else None,
"updated_at": company.updated_at.isoformat() if company.updated_at else None,
}
def _prospect_to_dict(self, prospect: Prospect) -> Dict:
"""Convert Prospect model to dictionary"""
result = {
"id": prospect.id,
"company_id": prospect.company_id,
"fit_score": prospect.fit_score,
"engagement_score": prospect.engagement_score,
"intent_score": prospect.intent_score,
"overall_score": prospect.overall_score,
"status": prospect.status,
"stage": prospect.stage,
"last_contacted_at": prospect.last_contacted_at.isoformat() if prospect.last_contacted_at else None,
"last_replied_at": prospect.last_replied_at.isoformat() if prospect.last_replied_at else None,
"emails_sent_count": prospect.emails_sent_count,
"emails_opened_count": prospect.emails_opened_count,
"emails_replied_count": prospect.emails_replied_count,
"personalized_pitch": prospect.personalized_pitch,
"pain_points": prospect.pain_points or {},
"value_propositions": prospect.value_propositions or {},
"source": prospect.source,
"enrichment_data": prospect.enrichment_data or {},
"metadata": prospect.metadata or {},
"is_suppressed": prospect.is_suppressed,
"created_at": prospect.created_at.isoformat() if prospect.created_at else None,
"updated_at": prospect.updated_at.isoformat() if prospect.updated_at else None,
}
# Include company data if loaded
if hasattr(prospect, 'company') and prospect.company:
result["company"] = self._company_to_dict(prospect.company)
return result
def _contact_to_dict(self, contact: Contact) -> Dict:
"""Convert Contact model to dictionary"""
return {
"id": contact.id,
"company_id": contact.company_id,
"email": contact.email,
"first_name": contact.first_name,
"last_name": contact.last_name,
"full_name": contact.full_name,
"title": contact.title,
"department": contact.department,
"seniority": contact.seniority,
"phone": contact.phone,
"linkedin_url": contact.linkedin_url,
"twitter_url": contact.twitter_url,
"email_valid": contact.email_valid,
"email_deliverability_score": contact.email_deliverability_score,
"is_role_based": contact.is_role_based,
"enrichment_data": contact.enrichment_data or {},
"metadata": contact.metadata or {},
"is_active": contact.is_active,
"is_primary_contact": contact.is_primary_contact,
"created_at": contact.created_at.isoformat() if contact.created_at else None,
"updated_at": contact.updated_at.isoformat() if contact.updated_at else None,
}