|
|
""" |
|
|
Database-Backed Store Service for MCP Server |
|
|
Replaces JSON file storage with enterprise-grade SQL database |
|
|
""" |
|
|
import uuid |
|
|
import logging |
|
|
from typing import Dict, List, Optional, Any |
|
|
from datetime import datetime |
|
|
|
|
|
from .engine import get_db_manager |
|
|
from .repositories import ( |
|
|
CompanyRepository, |
|
|
ProspectRepository, |
|
|
ContactRepository, |
|
|
FactRepository, |
|
|
ActivityRepository, |
|
|
SuppressionRepository, |
|
|
HandoffRepository |
|
|
) |
|
|
from .models import Company, Prospect, Contact, Fact, Suppression, Handoff |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
class DatabaseStoreService: |
|
|
""" |
|
|
Database-backed store service with enterprise features: |
|
|
- SQL database with ACID guarantees |
|
|
- Connection pooling |
|
|
- Tenant isolation |
|
|
- Audit logging |
|
|
- Transaction management |
|
|
""" |
|
|
|
|
|
def __init__(self, tenant_id: Optional[str] = None): |
|
|
self.db_manager = get_db_manager() |
|
|
self.tenant_id = tenant_id |
|
|
logger.info(f"Database store service initialized (tenant: {tenant_id or 'default'})") |
|
|
|
|
|
async def save_prospect(self, prospect: Dict) -> str: |
|
|
"""Save or update a prospect""" |
|
|
async with self.db_manager.get_session() as session: |
|
|
repo = ProspectRepository(session, self.tenant_id) |
|
|
|
|
|
|
|
|
existing = await repo.get_by_id(prospect["id"]) |
|
|
|
|
|
if existing: |
|
|
|
|
|
await repo.update(prospect["id"], prospect) |
|
|
logger.debug(f"Updated prospect: {prospect['id']}") |
|
|
else: |
|
|
|
|
|
await repo.create(prospect) |
|
|
logger.debug(f"Created prospect: {prospect['id']}") |
|
|
|
|
|
return "saved" |
|
|
|
|
|
async def get_prospect(self, prospect_id: str) -> Optional[Dict]: |
|
|
"""Get a prospect by ID""" |
|
|
async with self.db_manager.get_session() as session: |
|
|
repo = ProspectRepository(session, self.tenant_id) |
|
|
prospect = await repo.get_by_id(prospect_id, load_relationships=True) |
|
|
|
|
|
if prospect: |
|
|
return self._prospect_to_dict(prospect) |
|
|
return None |
|
|
|
|
|
async def list_prospects(self) -> List[Dict]: |
|
|
"""List all prospects""" |
|
|
async with self.db_manager.get_session() as session: |
|
|
repo = ProspectRepository(session, self.tenant_id) |
|
|
prospects = await repo.list(limit=1000) |
|
|
|
|
|
return [self._prospect_to_dict(p) for p in prospects] |
|
|
|
|
|
async def save_company(self, company: Dict) -> str: |
|
|
"""Save or update a company""" |
|
|
async with self.db_manager.get_session() as session: |
|
|
repo = CompanyRepository(session, self.tenant_id) |
|
|
|
|
|
|
|
|
existing = await repo.get_by_id(company["id"]) |
|
|
|
|
|
if existing: |
|
|
|
|
|
await repo.update(company["id"], company) |
|
|
logger.debug(f"Updated company: {company['id']}") |
|
|
else: |
|
|
|
|
|
await repo.create(company) |
|
|
logger.debug(f"Created company: {company['id']}") |
|
|
|
|
|
return "saved" |
|
|
|
|
|
async def get_company(self, company_id: str) -> Optional[Dict]: |
|
|
"""Get a company by ID""" |
|
|
async with self.db_manager.get_session() as session: |
|
|
repo = CompanyRepository(session, self.tenant_id) |
|
|
company = await repo.get_by_id(company_id) |
|
|
|
|
|
if company: |
|
|
return self._company_to_dict(company) |
|
|
return None |
|
|
|
|
|
async def save_fact(self, fact: Dict) -> str: |
|
|
"""Save a fact""" |
|
|
async with self.db_manager.get_session() as session: |
|
|
repo = FactRepository(session, self.tenant_id) |
|
|
|
|
|
|
|
|
try: |
|
|
query = session.query(Fact).filter(Fact.id == fact["id"]) |
|
|
if self.tenant_id: |
|
|
query = query.filter(Fact.tenant_id == self.tenant_id) |
|
|
existing = await session.execute(query) |
|
|
if existing.scalar_one_or_none(): |
|
|
logger.debug(f"Fact already exists: {fact['id']}") |
|
|
return "saved" |
|
|
except: |
|
|
pass |
|
|
|
|
|
|
|
|
await repo.create(fact) |
|
|
logger.debug(f"Created fact: {fact['id']}") |
|
|
|
|
|
return "saved" |
|
|
|
|
|
async def save_contact(self, contact: Dict) -> str: |
|
|
"""Save a contact""" |
|
|
async with self.db_manager.get_session() as session: |
|
|
repo = ContactRepository(session, self.tenant_id) |
|
|
|
|
|
|
|
|
email = contact.get("email", "").lower() |
|
|
if email: |
|
|
existing = await repo.get_by_email(email) |
|
|
if existing: |
|
|
logger.warning(f"Contact already exists: {email}") |
|
|
return "duplicate_skipped" |
|
|
|
|
|
|
|
|
if "id" in contact: |
|
|
existing = await repo.get_by_id(contact["id"]) |
|
|
if existing: |
|
|
logger.debug(f"Updating contact: {contact['id']}") |
|
|
|
|
|
return "saved" |
|
|
|
|
|
|
|
|
await repo.create(contact) |
|
|
logger.debug(f"Created contact: {contact['id']}") |
|
|
|
|
|
return "saved" |
|
|
|
|
|
async def list_contacts_by_domain(self, domain: str) -> List[Dict]: |
|
|
"""List contacts by domain""" |
|
|
async with self.db_manager.get_session() as session: |
|
|
repo = ContactRepository(session, self.tenant_id) |
|
|
contacts = await repo.list_by_domain(domain) |
|
|
|
|
|
return [self._contact_to_dict(c) for c in contacts] |
|
|
|
|
|
async def check_suppression(self, supp_type: str, value: str) -> bool: |
|
|
"""Check if an email/domain is suppressed""" |
|
|
async with self.db_manager.get_session() as session: |
|
|
repo = SuppressionRepository(session, self.tenant_id) |
|
|
is_suppressed = await repo.check(supp_type, value) |
|
|
|
|
|
return is_suppressed |
|
|
|
|
|
async def save_handoff(self, packet: Dict) -> str: |
|
|
"""Save a handoff packet""" |
|
|
async with self.db_manager.get_session() as session: |
|
|
repo = HandoffRepository(session, self.tenant_id) |
|
|
|
|
|
|
|
|
if "id" not in packet: |
|
|
packet["id"] = str(uuid.uuid4()) |
|
|
|
|
|
await repo.create(packet) |
|
|
logger.debug(f"Created handoff: {packet['id']}") |
|
|
|
|
|
return "saved" |
|
|
|
|
|
async def clear_all(self) -> str: |
|
|
"""Clear all data (use with caution!)""" |
|
|
logger.warning(f"Clearing all data for tenant: {self.tenant_id or 'default'}") |
|
|
|
|
|
async with self.db_manager.get_session() as session: |
|
|
|
|
|
await session.execute( |
|
|
"DELETE FROM activities WHERE tenant_id = :tenant", |
|
|
{"tenant": self.tenant_id or ""} |
|
|
) |
|
|
await session.execute( |
|
|
"DELETE FROM handoffs WHERE tenant_id = :tenant", |
|
|
{"tenant": self.tenant_id or ""} |
|
|
) |
|
|
await session.execute( |
|
|
"DELETE FROM facts WHERE tenant_id = :tenant", |
|
|
{"tenant": self.tenant_id or ""} |
|
|
) |
|
|
await session.execute( |
|
|
"DELETE FROM contacts WHERE tenant_id = :tenant", |
|
|
{"tenant": self.tenant_id or ""} |
|
|
) |
|
|
await session.execute( |
|
|
"DELETE FROM prospects WHERE tenant_id = :tenant", |
|
|
{"tenant": self.tenant_id or ""} |
|
|
) |
|
|
await session.execute( |
|
|
"DELETE FROM companies WHERE tenant_id = :tenant", |
|
|
{"tenant": self.tenant_id or ""} |
|
|
) |
|
|
|
|
|
await session.commit() |
|
|
|
|
|
logger.info("All data cleared") |
|
|
return "cleared" |
|
|
|
|
|
def _company_to_dict(self, company: Company) -> Dict: |
|
|
"""Convert Company model to dictionary""" |
|
|
return { |
|
|
"id": company.id, |
|
|
"name": company.name, |
|
|
"domain": company.domain, |
|
|
"description": company.description, |
|
|
"industry": company.industry, |
|
|
"employee_count": company.employee_count, |
|
|
"founded_year": company.founded_year, |
|
|
"revenue_range": company.revenue_range, |
|
|
"funding": company.funding, |
|
|
"headquarters_city": company.headquarters_city, |
|
|
"headquarters_state": company.headquarters_state, |
|
|
"headquarters_country": company.headquarters_country, |
|
|
"tech_stack": company.tech_stack or {}, |
|
|
"social_profiles": company.social_profiles or {}, |
|
|
"metadata": company.metadata or {}, |
|
|
"is_active": company.is_active, |
|
|
"created_at": company.created_at.isoformat() if company.created_at else None, |
|
|
"updated_at": company.updated_at.isoformat() if company.updated_at else None, |
|
|
} |
|
|
|
|
|
def _prospect_to_dict(self, prospect: Prospect) -> Dict: |
|
|
"""Convert Prospect model to dictionary""" |
|
|
result = { |
|
|
"id": prospect.id, |
|
|
"company_id": prospect.company_id, |
|
|
"fit_score": prospect.fit_score, |
|
|
"engagement_score": prospect.engagement_score, |
|
|
"intent_score": prospect.intent_score, |
|
|
"overall_score": prospect.overall_score, |
|
|
"status": prospect.status, |
|
|
"stage": prospect.stage, |
|
|
"last_contacted_at": prospect.last_contacted_at.isoformat() if prospect.last_contacted_at else None, |
|
|
"last_replied_at": prospect.last_replied_at.isoformat() if prospect.last_replied_at else None, |
|
|
"emails_sent_count": prospect.emails_sent_count, |
|
|
"emails_opened_count": prospect.emails_opened_count, |
|
|
"emails_replied_count": prospect.emails_replied_count, |
|
|
"personalized_pitch": prospect.personalized_pitch, |
|
|
"pain_points": prospect.pain_points or {}, |
|
|
"value_propositions": prospect.value_propositions or {}, |
|
|
"source": prospect.source, |
|
|
"enrichment_data": prospect.enrichment_data or {}, |
|
|
"metadata": prospect.metadata or {}, |
|
|
"is_suppressed": prospect.is_suppressed, |
|
|
"created_at": prospect.created_at.isoformat() if prospect.created_at else None, |
|
|
"updated_at": prospect.updated_at.isoformat() if prospect.updated_at else None, |
|
|
} |
|
|
|
|
|
|
|
|
if hasattr(prospect, 'company') and prospect.company: |
|
|
result["company"] = self._company_to_dict(prospect.company) |
|
|
|
|
|
return result |
|
|
|
|
|
def _contact_to_dict(self, contact: Contact) -> Dict: |
|
|
"""Convert Contact model to dictionary""" |
|
|
return { |
|
|
"id": contact.id, |
|
|
"company_id": contact.company_id, |
|
|
"email": contact.email, |
|
|
"first_name": contact.first_name, |
|
|
"last_name": contact.last_name, |
|
|
"full_name": contact.full_name, |
|
|
"title": contact.title, |
|
|
"department": contact.department, |
|
|
"seniority": contact.seniority, |
|
|
"phone": contact.phone, |
|
|
"linkedin_url": contact.linkedin_url, |
|
|
"twitter_url": contact.twitter_url, |
|
|
"email_valid": contact.email_valid, |
|
|
"email_deliverability_score": contact.email_deliverability_score, |
|
|
"is_role_based": contact.is_role_based, |
|
|
"enrichment_data": contact.enrichment_data or {}, |
|
|
"metadata": contact.metadata or {}, |
|
|
"is_active": contact.is_active, |
|
|
"is_primary_contact": contact.is_primary_contact, |
|
|
"created_at": contact.created_at.isoformat() if contact.created_at else None, |
|
|
"updated_at": contact.updated_at.isoformat() if contact.updated_at else None, |
|
|
} |
|
|
|