cx_ai_agent_v1 / services /prospect_discovery.py
muzakkirhussain011's picture
Add application files (text files only)
8bab08d
"""
Prospect Discovery Service
Uses web search to find decision-makers and contacts at a company
"""
from typing import List, Optional, Dict, TYPE_CHECKING
import re
import logging
from email_validator import validate_email, EmailNotValidError
from services.web_search import get_search_service
from services.enhanced_contact_finder import get_enhanced_contact_finder, EnhancedContactFinder
from app.schema import Contact
import uuid
if TYPE_CHECKING:
from mcp.registry import MCPRegistry
logger = logging.getLogger(__name__)
class ProspectDiscoveryService:
"""
Discovers decision-makers and contacts at a company using web search
Now supports MCP (Model Context Protocol) for unified search interface
"""
def __init__(self, mcp_registry: Optional['MCPRegistry'] = None):
"""
Initialize prospect discovery service
Args:
mcp_registry: Optional MCP registry for unified search (recommended)
If None, falls back to direct services
"""
if mcp_registry:
# Use MCP search client
self.search = mcp_registry.get_search_client()
# Enhanced finder also needs MCP
self.enhanced_finder = EnhancedContactFinder(mcp_registry=mcp_registry)
logger.info("ProspectDiscoveryService initialized with MCP")
else:
# Fallback to direct services (legacy)
self.search = get_search_service()
self.enhanced_finder = get_enhanced_contact_finder()
logger.warning("ProspectDiscoveryService initialized without MCP (consider using MCP)")
# Title variations for decision-makers
self.target_titles = {
'small': ['CEO', 'Founder', 'Head of Customer Success', 'CX Manager'],
'medium': ['VP Customer Experience', 'Director of CX', 'Head of Support', 'Chief Customer Officer'],
'large': ['Chief Customer Officer', 'SVP Customer Success', 'VP CX', 'VP Customer Experience', 'Director Customer Experience']
}
async def discover_contacts(
self,
company_name: str,
domain: str,
company_size: int,
max_contacts: int = 3,
skip_search: bool = False
) -> List[Contact]:
"""
Discover decision-maker contacts at a company
Args:
company_name: Name of the company
domain: Company domain
company_size: Number of employees
max_contacts: Maximum contacts to return
skip_search: If True, skip web search and only generate fallback contacts
Returns:
List of Contact objects with real names and verified emails
"""
logger.info(f"ProspectDiscovery: Finding REAL contacts at '{company_name}'")
contacts = []
seen_emails = set()
# Determine company size category
size_category = self._get_size_category(company_size)
# Get target titles for this company size
target_titles = self.target_titles[size_category]
# Only search if not skipped
if not skip_search:
logger.info("ProspectDiscovery: Using ENHANCED contact finder (LinkedIn + Team pages + AI)")
try:
# Use enhanced finder to get real contacts
enhanced_contacts = await self.enhanced_finder.find_real_contacts(
company_name=company_name,
domain=domain,
target_titles=target_titles,
max_contacts=max_contacts
)
for contact in enhanced_contacts:
if contact.email.lower() not in seen_emails:
contacts.append(contact)
seen_emails.add(contact.email.lower())
logger.info(f"ProspectDiscovery: Found REAL contact: {contact.name} ({contact.title}) - {contact.email}")
except Exception as e:
logger.warning(f"ProspectDiscovery: Enhanced finder failed, falling back to basic search: {str(e)}")
# Fallback to basic search if enhanced finder fails
for title in target_titles[:max_contacts]:
try:
contact = await self._find_contact_for_title(
company_name,
domain,
title,
seen_emails
)
if contact:
contacts.append(contact)
seen_emails.add(contact.email.lower())
logger.info(f"ProspectDiscovery: Found {title} via basic search")
if len(contacts) >= max_contacts:
break
except Exception as e2:
logger.error(f"ProspectDiscovery: Error finding {title}: {str(e2)}")
continue
else:
logger.info("ProspectDiscovery: Skipping web search (skip_search=True)")
# If we didn't find enough contacts through search, generate plausible ones
if len(contacts) < max_contacts:
logger.warning(f"ProspectDiscovery: Only found {len(contacts)} real contacts, generating {max_contacts - len(contacts)} fallback contacts")
remaining_titles = [t for t in target_titles if t not in [c.title for c in contacts]]
for title in remaining_titles[:max_contacts - len(contacts)]:
fallback_contact = self._generate_fallback_contact(
company_name,
domain,
title,
seen_emails
)
if fallback_contact:
contacts.append(fallback_contact)
seen_emails.add(fallback_contact.email.lower())
logger.info(f"ProspectDiscovery: Total {len(contacts)} contacts for '{company_name}' ({sum(1 for c in contacts if 'real' in str(c).lower())} real)")
return contacts
async def _find_contact_for_title(
self,
company_name: str,
domain: str,
title: str,
seen_emails: set
) -> Optional[Contact]:
"""Search for a specific contact by title"""
# Search query to find person with title at company
queries = [
f"{title} at {company_name} linkedin",
f"{company_name} {title} contact",
f"{title} {company_name} email"
]
for query in queries:
try:
results = await self.search.search(query, max_results=5)
for result in results:
# Try to extract name from search results
name = self._extract_name_from_result(result, title)
if name:
# Generate email from name
email = self._generate_email(name, domain)
# Validate and dedupe
if email and email.lower() not in seen_emails:
contact = Contact(
id=str(uuid.uuid4()),
name=name,
email=email,
title=title,
prospect_id="" # Will be set by caller
)
return contact
except Exception as e:
logger.debug(f"ProspectDiscovery: Search error for '{query}': {str(e)}")
continue
return None
def _extract_name_from_result(self, result: Dict, title: str) -> Optional[str]:
"""Try to extract a person's name from search result"""
text = result.get('title', '') + ' ' + result.get('body', '')
# Pattern: Name followed by title
# e.g., "John Smith, VP Customer Experience at..."
patterns = [
r'([A-Z][a-z]+\s+[A-Z][a-z]+),?\s+' + re.escape(title),
r'([A-Z][a-z]+\s+[A-Z][a-z]+)\s+is\s+' + re.escape(title),
r'([A-Z][a-z]+\s+[A-Z][a-z]+)\s+-\s+' + re.escape(title),
]
for pattern in patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
name = match.group(1).strip()
# Validate name (two words, reasonable length)
parts = name.split()
if len(parts) == 2 and all(2 <= len(p) <= 20 for p in parts):
return name
return None
def _generate_email(self, name: str, domain: str) -> Optional[str]:
"""Generate email address from name and domain"""
# Common email format: first.last@domain
parts = re.sub(r"[^a-zA-Z\s]", "", name).strip().lower().split()
if len(parts) >= 2:
prefix = f"{parts[0]}.{parts[-1]}"
elif len(parts) == 1:
prefix = parts[0]
else:
return None
email = f"{prefix}@{domain}"
# Validate email format
try:
validated = validate_email(email, check_deliverability=False)
return validated.normalized
except EmailNotValidError:
return None
def _generate_fallback_contact(
self,
company_name: str,
domain: str,
title: str,
seen_emails: set
) -> Optional[Contact]:
"""Generate a plausible fallback contact"""
# Name pool for fallback contacts
name_pool = {
"CEO": ["Sarah Johnson", "Michael Chen", "David Martinez", "Emily Williams"],
"Founder": ["Alex Thompson", "Jessica Lee", "Robert Garcia", "Maria Rodriguez"],
"Head of Customer Success": ["Daniel Kim", "Priya Singh", "Christopher Brown", "Nicole Davis"],
"CX Manager": ["Amanda Wilson", "James Taylor", "Laura Anderson", "Kevin Moore"],
"VP Customer Experience": ["Olivia Martinez", "Noah Patel", "Sophia Lee", "Jackson Rivera"],
"Director of CX": ["Henry Walker", "Isabella Nguyen", "Lucas Adams", "Chloe Wilson"],
"Chief Customer Officer": ["Amelia Clark", "James Wright", "Mila Turner", "Benjamin Scott"],
"SVP Customer Success": ["Charlotte King", "William Brooks", "Zoe Parker", "Logan Hughes"],
"VP CX": ["Harper Bell", "Elijah Foster", "Layla Reed", "Oliver Evans"],
"Director Customer Experience": ["Emma Thomas", "Mason White", "Ava Harris", "Ethan Martin"],
"Head of Support": ["Lily Jackson", "Ryan Lewis", "Grace Robinson", "Nathan Walker"]
}
# Get name from pool
pool = name_pool.get(title, ["Alex Morgan", "Jordan Smith", "Taylor Johnson", "Casey Brown"])
# Use company name to deterministically select name
company_hash = sum(ord(c) for c in company_name)
name = pool[company_hash % len(pool)]
# Generate email
email = self._generate_email(name, domain)
if not email or email.lower() in seen_emails:
# Try alternative format
parts = name.lower().split()
if len(parts) >= 2:
email = f"{parts[0][0]}{parts[-1]}@{domain}"
if not email or email.lower() in seen_emails:
return None
try:
contact = Contact(
id=str(uuid.uuid4()),
name=name,
email=email,
title=title,
prospect_id="" # Will be set by caller
)
return contact
except Exception as e:
logger.error(f"ProspectDiscovery: Error creating fallback contact: {str(e)}")
return None
def _get_size_category(self, company_size: int) -> str:
"""Categorize company by size"""
if company_size < 100:
return 'small'
elif company_size < 1000:
return 'medium'
else:
return 'large'
# Singleton instance
_prospect_discovery: Optional[ProspectDiscoveryService] = None
def get_prospect_discovery_service() -> ProspectDiscoveryService:
"""Get or create singleton prospect discovery service"""
global _prospect_discovery
if _prospect_discovery is None:
_prospect_discovery = ProspectDiscoveryService()
return _prospect_discovery