cx_ai_agent_v1 / agents /contactor.py
muzakkirhussain011's picture
Add application files (text files only)
8bab08d
# file: agents/contactor.py
"""
Contactor Agent - Discovers decision-makers at target companies
Now uses web search to find real contacts instead of generating mock data
"""
from app.schema import Prospect, Contact
from app.config import SKIP_WEB_SEARCH
import logging
from services.prospect_discovery import get_prospect_discovery_service
logger = logging.getLogger(__name__)
class Contactor:
"""
Discovers and validates decision-maker contacts
IMPROVED: Now uses web search to discover real decision-makers
Falls back to plausible generated contacts when search doesn't find results
"""
def __init__(self, mcp_registry):
self.mcp = mcp_registry
self.store = mcp_registry.get_store_client()
self.prospect_discovery = get_prospect_discovery_service()
async def run(self, prospect: Prospect) -> Prospect:
"""Discover decision-maker contacts"""
logger.info(f"Contactor: Finding contacts for '{prospect.company.name}'")
# Check domain suppression first
suppressed = await self.store.check_suppression(
"domain",
prospect.company.domain
)
if suppressed:
logger.warning(f"Contactor: Domain suppressed: {prospect.company.domain}")
prospect.status = "dropped"
prospect.dropped_reason = f"Domain suppressed: {prospect.company.domain}"
await self.store.save_prospect(prospect)
return prospect
# Get existing contacts to dedupe
seen_emails = set()
try:
existing = await self.store.list_contacts_by_domain(prospect.company.domain)
for contact in existing:
if hasattr(contact, 'email'):
seen_emails.add(contact.email.lower())
except Exception as e:
logger.error(f"Contactor: Error fetching existing contacts: {str(e)}")
# Discover contacts using web search
contacts = []
try:
# Determine number of contacts based on company size
max_contacts = 2 if prospect.company.size < 100 else 3
discovered_contacts = await self.prospect_discovery.discover_contacts(
company_name=prospect.company.name,
domain=prospect.company.domain,
company_size=prospect.company.size,
max_contacts=max_contacts,
skip_search=SKIP_WEB_SEARCH # Respect SKIP_WEB_SEARCH flag
)
# Filter out already seen emails and check individual email suppression
for contact in discovered_contacts:
email_lower = contact.email.lower()
# Skip if already seen
if email_lower in seen_emails:
logger.info(f"Contactor: Skipping duplicate email: {contact.email}")
continue
# Check email-level suppression
email_suppressed = await self.store.check_suppression("email", contact.email)
if email_suppressed:
logger.warning(f"Contactor: Email suppressed: {contact.email}")
continue
# Set prospect ID
contact.prospect_id = prospect.id
# Save and add to list
await self.store.save_contact(contact)
contacts.append(contact)
seen_emails.add(email_lower)
logger.info(f"Contactor: Added contact: {contact.name} ({contact.title})")
except Exception as e:
logger.error(f"Contactor: Error discovering contacts: {str(e)}")
# Continue with empty contacts list
# Update prospect
prospect.contacts = contacts
prospect.status = "contacted"
await self.store.save_prospect(prospect)
logger.info(f"Contactor: Found {len(contacts)} contacts for '{prospect.company.name}'")
return prospect