|
|
""" |
|
|
Test Contact Finder - Verify real person discovery works |
|
|
""" |
|
|
import asyncio |
|
|
import logging |
|
|
from services.enhanced_contact_finder import get_enhanced_contact_finder |
|
|
from services.prospect_discovery import get_prospect_discovery_service |
|
|
|
|
|
|
|
|
logging.basicConfig( |
|
|
level=logging.INFO, |
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
|
) |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
async def test_enhanced_finder(): |
|
|
"""Test the enhanced contact finder with real companies""" |
|
|
|
|
|
print("\n" + "="*80) |
|
|
print("TESTING ENHANCED CONTACT FINDER") |
|
|
print("="*80 + "\n") |
|
|
|
|
|
|
|
|
test_cases = [ |
|
|
{ |
|
|
"name": "Shopify", |
|
|
"domain": "shopify.com", |
|
|
"titles": ["CEO", "Chief Customer Officer", "VP Customer Experience"] |
|
|
}, |
|
|
{ |
|
|
"name": "Stripe", |
|
|
"domain": "stripe.com", |
|
|
"titles": ["CEO", "Head of Customer Success", "VP Support"] |
|
|
}, |
|
|
{ |
|
|
"name": "Airbnb", |
|
|
"domain": "airbnb.com", |
|
|
"titles": ["CEO", "Chief Customer Officer", "VP Community"] |
|
|
} |
|
|
] |
|
|
|
|
|
finder = get_enhanced_contact_finder() |
|
|
|
|
|
for test in test_cases: |
|
|
print(f"\n{'-'*80}") |
|
|
print(f"Testing: {test['name']} ({test['domain']})") |
|
|
print(f"{'-'*80}\n") |
|
|
|
|
|
try: |
|
|
contacts = await finder.find_real_contacts( |
|
|
company_name=test['name'], |
|
|
domain=test['domain'], |
|
|
target_titles=test['titles'], |
|
|
max_contacts=3 |
|
|
) |
|
|
|
|
|
if contacts: |
|
|
print(f"[OK] Found {len(contacts)} REAL contacts:\n") |
|
|
for i, contact in enumerate(contacts, 1): |
|
|
print(f" {i}. {contact.name}") |
|
|
print(f" Title: {contact.title}") |
|
|
print(f" Email: {contact.email}") |
|
|
print() |
|
|
else: |
|
|
print(f"[FAIL] No contacts found (will use fallback)\n") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[ERROR] {str(e)}\n") |
|
|
logger.error(f"Error testing {test['name']}: {e}") |
|
|
|
|
|
|
|
|
async def test_prospect_discovery(): |
|
|
"""Test the full prospect discovery service""" |
|
|
|
|
|
print("\n" + "="*80) |
|
|
print("TESTING PROSPECT DISCOVERY SERVICE") |
|
|
print("="*80 + "\n") |
|
|
|
|
|
test_cases = [ |
|
|
{ |
|
|
"name": "Zapier", |
|
|
"domain": "zapier.com", |
|
|
"size": 500 |
|
|
}, |
|
|
{ |
|
|
"name": "Notion", |
|
|
"domain": "notion.so", |
|
|
"size": 200 |
|
|
} |
|
|
] |
|
|
|
|
|
discovery = get_prospect_discovery_service() |
|
|
|
|
|
for test in test_cases: |
|
|
print(f"\n{'-'*80}") |
|
|
print(f"Testing: {test['name']} ({test['domain']}) - {test['size']} employees") |
|
|
print(f"{'-'*80}\n") |
|
|
|
|
|
try: |
|
|
contacts = await discovery.discover_contacts( |
|
|
company_name=test['name'], |
|
|
domain=test['domain'], |
|
|
company_size=test['size'], |
|
|
max_contacts=3, |
|
|
skip_search=False |
|
|
) |
|
|
|
|
|
if contacts: |
|
|
print(f"[OK] Found {len(contacts)} contacts:\n") |
|
|
for i, contact in enumerate(contacts, 1): |
|
|
print(f" {i}. {contact.name}") |
|
|
print(f" Title: {contact.title}") |
|
|
print(f" Email: {contact.email}") |
|
|
print() |
|
|
else: |
|
|
print(f"[FAIL] No contacts found\n") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[ERROR] {str(e)}\n") |
|
|
logger.error(f"Error testing {test['name']}: {e}") |
|
|
|
|
|
|
|
|
async def test_email_generation_with_contacts(): |
|
|
"""Test that emails use contact names""" |
|
|
|
|
|
print("\n" + "="*80) |
|
|
print("TESTING EMAIL GENERATION WITH CONTACTS") |
|
|
print("="*80 + "\n") |
|
|
|
|
|
from app.schema import Prospect, Company, Contact |
|
|
from agents.writer import Writer |
|
|
from mcp.registry import MCPRegistry |
|
|
import uuid |
|
|
|
|
|
|
|
|
prospect = Prospect( |
|
|
id=str(uuid.uuid4()), |
|
|
company=Company( |
|
|
id=str(uuid.uuid4()), |
|
|
name="Test E-commerce Co", |
|
|
domain="testecommerce.com", |
|
|
industry="E-commerce", |
|
|
size=150, |
|
|
pains=["High customer churn", "Poor support response times"], |
|
|
notes=["Growing fast, needs to scale support"] |
|
|
), |
|
|
contacts=[ |
|
|
Contact( |
|
|
id=str(uuid.uuid4()), |
|
|
name="Sarah Johnson", |
|
|
email="sarah.johnson@testecommerce.com", |
|
|
title="VP Customer Experience", |
|
|
prospect_id="" |
|
|
), |
|
|
Contact( |
|
|
id=str(uuid.uuid4()), |
|
|
name="Michael Chen", |
|
|
email="michael.chen@testecommerce.com", |
|
|
title="Director of Customer Success", |
|
|
prospect_id="" |
|
|
) |
|
|
] |
|
|
) |
|
|
|
|
|
print(f"Company: {prospect.company.name}") |
|
|
print(f"Contacts:") |
|
|
for contact in prospect.contacts: |
|
|
print(f" - {contact.name} ({contact.title}) - {contact.email}") |
|
|
print() |
|
|
|
|
|
|
|
|
registry = MCPRegistry() |
|
|
writer = Writer(registry) |
|
|
|
|
|
print("Generating personalized email...\n") |
|
|
|
|
|
try: |
|
|
result = await writer.run(prospect) |
|
|
|
|
|
if result.email_draft: |
|
|
print(f"[OK] Email Generated:\n") |
|
|
print(f"Subject: {result.email_draft.get('subject', 'N/A')}") |
|
|
print(f"\nBody:\n{result.email_draft.get('body', 'N/A')}") |
|
|
print() |
|
|
|
|
|
|
|
|
body = result.email_draft.get('body', '') |
|
|
first_name = prospect.contacts[0].name.split()[0] |
|
|
|
|
|
if first_name in body: |
|
|
print(f"[OK] Contact first name '{first_name}' found in email!") |
|
|
else: |
|
|
print(f"[FAIL] Contact first name '{first_name}' NOT found in email!") |
|
|
print(f" This means personalization failed!") |
|
|
else: |
|
|
print(f"[FAIL] No email draft generated") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"[ERROR] {str(e)}") |
|
|
logger.error(f"Error generating email: {e}") |
|
|
|
|
|
|
|
|
async def main(): |
|
|
"""Run all tests""" |
|
|
|
|
|
print("\n[TEST] CONTACT FINDER TEST SUITE") |
|
|
print("=" * 80) |
|
|
|
|
|
|
|
|
print("\n\n[TEST 1] Enhanced Contact Finder") |
|
|
await test_enhanced_finder() |
|
|
|
|
|
|
|
|
print("\n\n[TEST 2] Prospect Discovery Service") |
|
|
await test_prospect_discovery() |
|
|
|
|
|
|
|
|
print("\n\n[TEST 3] Email Generation with Contacts") |
|
|
await test_email_generation_with_contacts() |
|
|
|
|
|
print("\n\n" + "="*80) |
|
|
print("[DONE] ALL TESTS COMPLETE") |
|
|
print("="*80 + "\n") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
asyncio.run(main()) |
|
|
|