""" Database-Backed Store Service for MCP Server Replaces JSON file storage with enterprise-grade SQL database """ import uuid import logging from typing import Dict, List, Optional, Any from datetime import datetime from .engine import get_db_manager from .repositories import ( CompanyRepository, ProspectRepository, ContactRepository, FactRepository, ActivityRepository, SuppressionRepository, HandoffRepository ) from .models import Company, Prospect, Contact, Fact, Suppression, Handoff logger = logging.getLogger(__name__) class DatabaseStoreService: """ Database-backed store service with enterprise features: - SQL database with ACID guarantees - Connection pooling - Tenant isolation - Audit logging - Transaction management """ def __init__(self, tenant_id: Optional[str] = None): self.db_manager = get_db_manager() self.tenant_id = tenant_id logger.info(f"Database store service initialized (tenant: {tenant_id or 'default'})") async def save_prospect(self, prospect: Dict) -> str: """Save or update a prospect""" async with self.db_manager.get_session() as session: repo = ProspectRepository(session, self.tenant_id) # Check if exists existing = await repo.get_by_id(prospect["id"]) if existing: # Update existing await repo.update(prospect["id"], prospect) logger.debug(f"Updated prospect: {prospect['id']}") else: # Create new await repo.create(prospect) logger.debug(f"Created prospect: {prospect['id']}") return "saved" async def get_prospect(self, prospect_id: str) -> Optional[Dict]: """Get a prospect by ID""" async with self.db_manager.get_session() as session: repo = ProspectRepository(session, self.tenant_id) prospect = await repo.get_by_id(prospect_id, load_relationships=True) if prospect: return self._prospect_to_dict(prospect) return None async def list_prospects(self) -> List[Dict]: """List all prospects""" async with self.db_manager.get_session() as session: repo = ProspectRepository(session, self.tenant_id) prospects = await repo.list(limit=1000) return [self._prospect_to_dict(p) for p in prospects] async def save_company(self, company: Dict) -> str: """Save or update a company""" async with self.db_manager.get_session() as session: repo = CompanyRepository(session, self.tenant_id) # Check if exists existing = await repo.get_by_id(company["id"]) if existing: # Update existing await repo.update(company["id"], company) logger.debug(f"Updated company: {company['id']}") else: # Create new await repo.create(company) logger.debug(f"Created company: {company['id']}") return "saved" async def get_company(self, company_id: str) -> Optional[Dict]: """Get a company by ID""" async with self.db_manager.get_session() as session: repo = CompanyRepository(session, self.tenant_id) company = await repo.get_by_id(company_id) if company: return self._company_to_dict(company) return None async def save_fact(self, fact: Dict) -> str: """Save a fact""" async with self.db_manager.get_session() as session: repo = FactRepository(session, self.tenant_id) # Check if exists by ID try: query = session.query(Fact).filter(Fact.id == fact["id"]) if self.tenant_id: query = query.filter(Fact.tenant_id == self.tenant_id) existing = await session.execute(query) if existing.scalar_one_or_none(): logger.debug(f"Fact already exists: {fact['id']}") return "saved" except: pass # Create new fact await repo.create(fact) logger.debug(f"Created fact: {fact['id']}") return "saved" async def save_contact(self, contact: Dict) -> str: """Save a contact""" async with self.db_manager.get_session() as session: repo = ContactRepository(session, self.tenant_id) # Check if exists by email email = contact.get("email", "").lower() if email: existing = await repo.get_by_email(email) if existing: logger.warning(f"Contact already exists: {email}") return "duplicate_skipped" # Check if exists by ID if "id" in contact: existing = await repo.get_by_id(contact["id"]) if existing: logger.debug(f"Updating contact: {contact['id']}") # Update logic here if needed return "saved" # Create new contact await repo.create(contact) logger.debug(f"Created contact: {contact['id']}") return "saved" async def list_contacts_by_domain(self, domain: str) -> List[Dict]: """List contacts by domain""" async with self.db_manager.get_session() as session: repo = ContactRepository(session, self.tenant_id) contacts = await repo.list_by_domain(domain) return [self._contact_to_dict(c) for c in contacts] async def check_suppression(self, supp_type: str, value: str) -> bool: """Check if an email/domain is suppressed""" async with self.db_manager.get_session() as session: repo = SuppressionRepository(session, self.tenant_id) is_suppressed = await repo.check(supp_type, value) return is_suppressed async def save_handoff(self, packet: Dict) -> str: """Save a handoff packet""" async with self.db_manager.get_session() as session: repo = HandoffRepository(session, self.tenant_id) # Generate ID if not present if "id" not in packet: packet["id"] = str(uuid.uuid4()) await repo.create(packet) logger.debug(f"Created handoff: {packet['id']}") return "saved" async def clear_all(self) -> str: """Clear all data (use with caution!)""" logger.warning(f"Clearing all data for tenant: {self.tenant_id or 'default'}") async with self.db_manager.get_session() as session: # Delete in order to respect foreign keys await session.execute( "DELETE FROM activities WHERE tenant_id = :tenant", {"tenant": self.tenant_id or ""} ) await session.execute( "DELETE FROM handoffs WHERE tenant_id = :tenant", {"tenant": self.tenant_id or ""} ) await session.execute( "DELETE FROM facts WHERE tenant_id = :tenant", {"tenant": self.tenant_id or ""} ) await session.execute( "DELETE FROM contacts WHERE tenant_id = :tenant", {"tenant": self.tenant_id or ""} ) await session.execute( "DELETE FROM prospects WHERE tenant_id = :tenant", {"tenant": self.tenant_id or ""} ) await session.execute( "DELETE FROM companies WHERE tenant_id = :tenant", {"tenant": self.tenant_id or ""} ) await session.commit() logger.info("All data cleared") return "cleared" def _company_to_dict(self, company: Company) -> Dict: """Convert Company model to dictionary""" return { "id": company.id, "name": company.name, "domain": company.domain, "description": company.description, "industry": company.industry, "employee_count": company.employee_count, "founded_year": company.founded_year, "revenue_range": company.revenue_range, "funding": company.funding, "headquarters_city": company.headquarters_city, "headquarters_state": company.headquarters_state, "headquarters_country": company.headquarters_country, "tech_stack": company.tech_stack or {}, "social_profiles": company.social_profiles or {}, "metadata": company.metadata or {}, "is_active": company.is_active, "created_at": company.created_at.isoformat() if company.created_at else None, "updated_at": company.updated_at.isoformat() if company.updated_at else None, } def _prospect_to_dict(self, prospect: Prospect) -> Dict: """Convert Prospect model to dictionary""" result = { "id": prospect.id, "company_id": prospect.company_id, "fit_score": prospect.fit_score, "engagement_score": prospect.engagement_score, "intent_score": prospect.intent_score, "overall_score": prospect.overall_score, "status": prospect.status, "stage": prospect.stage, "last_contacted_at": prospect.last_contacted_at.isoformat() if prospect.last_contacted_at else None, "last_replied_at": prospect.last_replied_at.isoformat() if prospect.last_replied_at else None, "emails_sent_count": prospect.emails_sent_count, "emails_opened_count": prospect.emails_opened_count, "emails_replied_count": prospect.emails_replied_count, "personalized_pitch": prospect.personalized_pitch, "pain_points": prospect.pain_points or {}, "value_propositions": prospect.value_propositions or {}, "source": prospect.source, "enrichment_data": prospect.enrichment_data or {}, "metadata": prospect.metadata or {}, "is_suppressed": prospect.is_suppressed, "created_at": prospect.created_at.isoformat() if prospect.created_at else None, "updated_at": prospect.updated_at.isoformat() if prospect.updated_at else None, } # Include company data if loaded if hasattr(prospect, 'company') and prospect.company: result["company"] = self._company_to_dict(prospect.company) return result def _contact_to_dict(self, contact: Contact) -> Dict: """Convert Contact model to dictionary""" return { "id": contact.id, "company_id": contact.company_id, "email": contact.email, "first_name": contact.first_name, "last_name": contact.last_name, "full_name": contact.full_name, "title": contact.title, "department": contact.department, "seniority": contact.seniority, "phone": contact.phone, "linkedin_url": contact.linkedin_url, "twitter_url": contact.twitter_url, "email_valid": contact.email_valid, "email_deliverability_score": contact.email_deliverability_score, "is_role_based": contact.is_role_based, "enrichment_data": contact.enrichment_data or {}, "metadata": contact.metadata or {}, "is_active": contact.is_active, "is_primary_contact": contact.is_primary_contact, "created_at": contact.created_at.isoformat() if contact.created_at else None, "updated_at": contact.updated_at.isoformat() if contact.updated_at else None, }