EZOFIS-IRP / backend /app /apollo_service.py
Seth
update
ced5eff
"""
Apollo.io API service for creating contacts, enriching contact data, and adding them to sequences.
Reference:
- Create contact: https://docs.apollo.io/reference/create-a-contact
- Add to sequence: https://docs.apollo.io/reference/add-contacts-to-sequence
- Enrich person: https://docs.apollo.io/reference/enrich-people-data
"""
import os
import httpx
from typing import Optional, Dict, Any
APOLLO_API_KEY = os.environ.get("APOLLO_API_KEY", "")
APOLLO_API_URL = "https://api.apollo.io/api/v1"
APOLLO_TRIAL_LIST_NAME = "VPR TRIAL LEADS"
# Allow list ID to be set directly via environment variable (more reliable than lookup)
APOLLO_TRIAL_LIST_ID = os.environ.get("APOLLO_TRIAL_LIST_ID", None)
# Sequence ID for adding contacts to email sequences (preferred over lists)
APOLLO_TRIAL_SEQUENCE_ID = os.environ.get("APOLLO_TRIAL_SEQUENCE_ID", None)
async def get_list_id(list_name: Optional[str] = None) -> Optional[str]:
"""
Get Apollo list ID. First tries environment variable, then attempts API lookup.
Args:
list_name: Name of the list (for lookup if env var not set)
Returns:
List ID as string if found, None otherwise
"""
# First, try to use the list ID from environment variable (most reliable)
if APOLLO_TRIAL_LIST_ID:
# Apollo list IDs are typically hexadecimal strings (MongoDB ObjectIds)
# Accept them as strings, just strip whitespace
list_id = str(APOLLO_TRIAL_LIST_ID).strip()
if list_id:
print(f"[INFO] Using Apollo list ID from environment variable: {list_id}")
return list_id
else:
print(f"[WARNING] APOLLO_TRIAL_LIST_ID is empty")
# If no env var, try to look up by name (this may not work if API endpoint is different)
if not list_name or not APOLLO_API_KEY:
return None
# Note: The /lists endpoint may not be available in all Apollo API versions
# Try alternative: search for lists using a different endpoint
try:
async with httpx.AsyncClient() as client:
# Try the lists endpoint (may return 404 in some API versions)
response = await client.get(
f"{APOLLO_API_URL}/lists",
headers={
"Content-Type": "application/json",
"Cache-Control": "no-cache",
"X-Api-Key": APOLLO_API_KEY
},
timeout=10.0
)
if response.status_code == 200:
data = response.json()
lists = data.get("lists", [])
for list_item in lists:
if list_item.get("name") == list_name:
list_id = list_item.get("id")
print(f"[INFO] Found Apollo list '{list_name}' with ID: {list_id}")
# Return as string (Apollo IDs are typically hex strings)
return str(list_id) if list_id else None
print(f"[WARNING] Apollo list '{list_name}' not found in available lists")
else:
print(f"[WARNING] Apollo lists endpoint returned {response.status_code}, cannot lookup list by name")
except Exception as e:
print(f"[WARNING] Failed to fetch Apollo list ID: {str(e)}")
return None
async def add_contact_to_sequence(contact_id: str, sequence_id: str) -> bool:
"""
Add a contact to an Apollo.io email sequence.
Args:
contact_id: The Apollo contact ID
sequence_id: The Apollo sequence ID
Returns:
True if contact was successfully added to sequence, False otherwise
"""
if not APOLLO_API_KEY:
print("[WARNING] APOLLO_API_KEY not set, skipping sequence enrollment")
return False
try:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{APOLLO_API_URL}/sequence_contacts",
headers={
"Content-Type": "application/json",
"Cache-Control": "no-cache",
"X-Api-Key": APOLLO_API_KEY
},
json={
"sequence_id": sequence_id,
"contact_id": contact_id
},
timeout=10.0
)
if response.status_code in [200, 201]:
print(f"[INFO] Successfully added contact {contact_id} to sequence {sequence_id}")
return True
else:
error_data = response.text
print(f"[ERROR] Failed to add contact to sequence: {response.status_code} - {error_data}")
return False
except httpx.HTTPStatusError as e:
print(f"[ERROR] Apollo API HTTP error adding to sequence: {e.response.status_code} - {e.response.text}")
return False
except Exception as e:
print(f"[ERROR] Failed to add contact to sequence: {str(e)}")
return False
async def create_apollo_contact(
email: str,
first_name: Optional[str] = None,
last_name: Optional[str] = None,
organization_name: Optional[str] = None,
title: Optional[str] = None,
list_name: Optional[str] = None,
sequence_id: Optional[str] = None
) -> bool:
"""
Create a contact in Apollo.io and optionally add to a sequence or list.
Args:
email: Contact email address (required)
first_name: Contact first name
last_name: Contact last name
organization_name: Organization name
title: Job title
list_name: Name of the list to add contact to (defaults to APOLLO_TRIAL_LIST_NAME)
sequence_id: ID of the sequence to add contact to (preferred over list)
Returns:
True if contact created successfully, False otherwise
Raises:
ValueError: If APOLLO_API_KEY is not set
"""
if not APOLLO_API_KEY:
print("[WARNING] APOLLO_API_KEY not set, skipping Apollo contact creation")
return False
# Use default list name if not provided
if list_name is None:
list_name = APOLLO_TRIAL_LIST_NAME
# Parse name if full name is provided but first/last are not
if not first_name and not last_name:
# Try to extract from email or use email prefix
email_prefix = email.split('@')[0]
if '.' in email_prefix:
parts = email_prefix.split('.')
first_name = parts[0].capitalize() if parts else None
last_name = parts[1].capitalize() if len(parts) > 1 else None
else:
first_name = email_prefix.capitalize()
# Extract organization domain from email
organization_domain = None
if '@' in email:
organization_domain = email.split('@')[1]
# Prepare contact data
contact_data: Dict[str, Any] = {
"email": email.lower(),
"run_dedupe": True # Prevent duplicate contacts
}
if first_name:
contact_data["first_name"] = first_name
if last_name:
contact_data["last_name"] = last_name
if organization_name:
contact_data["organization_name"] = organization_name
if organization_domain:
contact_data["organization_domain"] = organization_domain
if title:
contact_data["title"] = title
try:
async with httpx.AsyncClient() as client:
# Get the list ID if list_name is provided
list_ids = []
target_list_id = None # Store for later use
if list_name:
list_id = await get_list_id(list_name)
if list_id:
target_list_id = list_id # Store for verification later
# Apollo API accepts list_ids as an array of strings (hex IDs)
list_ids = [str(list_id)]
contact_data["list_ids"] = list_ids
print(f"[INFO] Adding contact to list ID: {list_id}")
else:
print(f"[WARNING] Could not find list '{list_name}'. Set APOLLO_TRIAL_LIST_ID environment variable with the list ID, or create contact without list assignment")
# Log the payload being sent (for debugging)
print(f"[DEBUG] Creating Apollo contact with payload: {contact_data}")
# Create the contact
response = await client.post(
f"{APOLLO_API_URL}/contacts",
headers={
"Content-Type": "application/json",
"Cache-Control": "no-cache",
"X-Api-Key": APOLLO_API_KEY
},
json=contact_data,
timeout=10.0
)
# Log the full response for debugging
print(f"[DEBUG] Apollo API response status: {response.status_code}")
try:
response_json = response.json()
print(f"[DEBUG] Apollo API response (full): {response_json}")
except:
print(f"[DEBUG] Apollo API response body (text): {response.text[:1000]}") # First 1000 chars
if response.status_code == 200 or response.status_code == 201:
result = response.json()
contact = result.get("contact", {})
contact_id = contact.get("id")
print(f"[INFO] Successfully created Apollo contact: {email} (ID: {contact_id})")
# Priority: Add to sequence if sequence_id is provided (this is supported by API)
target_sequence_id = sequence_id or APOLLO_TRIAL_SEQUENCE_ID
if contact_id and target_sequence_id:
print(f"[INFO] Adding contact to sequence: {target_sequence_id}")
sequence_success = await add_contact_to_sequence(contact_id, target_sequence_id)
if sequence_success:
print(f"[INFO] ✓ Contact successfully enrolled in sequence")
else:
print(f"[WARNING] Failed to add contact to sequence, but contact was created")
# Fallback: Try to add to list (API limitation - may not work)
if list_ids and contact_id and target_list_id and not target_sequence_id:
print(f"[INFO] Contact created with list_ids parameter: {list_ids}")
print(f"[INFO] ⚠️ Apollo.io API Limitation: The API does not return list_ids in responses,")
print(f"[INFO] so we cannot verify if the contact was added to the list via API.")
print(f"[INFO] Please verify manually in Apollo.io that contact '{email}' is in list '{list_name or target_list_id}'")
print(f"[INFO] Consider using sequences instead (APOLLO_TRIAL_SEQUENCE_ID) for better API support.")
return True
else:
error_data = response.text
print(f"[ERROR] Failed to create Apollo contact: {response.status_code} - {error_data}")
return False
except httpx.HTTPStatusError as e:
print(f"[ERROR] Apollo API HTTP error: {e.response.status_code} - {e.response.text}")
return False
except Exception as e:
print(f"[ERROR] Failed to create Apollo contact: {str(e)}")
return False
async def enrich_contact_by_email(email: str) -> Optional[Dict[str, Any]]:
"""
Enrich contact data from Apollo.io using email address.
Args:
email: Contact email address
Returns:
Dictionary with enriched contact data, or None if not found
"""
if not APOLLO_API_KEY:
print("[WARNING] APOLLO_API_KEY not set, skipping Apollo enrichment")
return None
try:
async with httpx.AsyncClient() as client:
# Try people/match endpoint first (for exact email match)
print(f"[DEBUG] Attempting Apollo.io enrichment for {email} via /people/match endpoint")
response = await client.post(
f"{APOLLO_API_URL}/people/match",
headers={
"Content-Type": "application/json",
"Cache-Control": "no-cache",
"X-Api-Key": APOLLO_API_KEY
},
json={
"email": email.lower()
# Note: reveal_phone_number requires webhook_url, so we skip it for now
},
timeout=10.0
)
print(f"[DEBUG] Apollo.io /people/match response status: {response.status_code}")
if response.status_code == 200:
data = response.json()
print(f"[DEBUG] Apollo.io /people/match response data keys: {list(data.keys())}")
person = data.get("person", {})
if person:
print(f"[DEBUG] Found person data in Apollo.io response")
# Extract enriched data
enriched_data = {
"first_name": person.get("first_name"),
"last_name": person.get("last_name"),
"title": person.get("title"),
"phone_number": person.get("phone_numbers", [{}])[0].get("raw_number") if person.get("phone_numbers") else None,
"linkedin_url": person.get("linkedin_url"),
"headline": person.get("headline"),
"organization_name": person.get("organization", {}).get("name") if person.get("organization") else None,
"organization_website": person.get("organization", {}).get("website_url") if person.get("organization") else None,
"organization_address": None, # May need to parse from organization data
}
# Try to get organization address
if person.get("organization"):
org = person.get("organization", {})
address_parts = []
if org.get("street_address"):
address_parts.append(org.get("street_address"))
if org.get("city"):
address_parts.append(org.get("city"))
if org.get("state"):
address_parts.append(org.get("state"))
if org.get("postal_code"):
address_parts.append(org.get("postal_code"))
if org.get("country"):
address_parts.append(org.get("country"))
if address_parts:
enriched_data["organization_address"] = ", ".join(address_parts)
print(f"[INFO] Successfully enriched contact data for {email} from Apollo.io")
return enriched_data
else:
print(f"[DEBUG] Apollo.io /people/match returned 200 but no person data found")
elif response.status_code == 404:
print(f"[DEBUG] Apollo.io /people/match returned 404 - contact not found in database")
elif response.status_code == 401:
print(f"[ERROR] Apollo.io API authentication failed - check your API key")
try:
error_data = response.json()
print(f"[ERROR] Apollo.io error details: {error_data}")
except:
print(f"[ERROR] Apollo.io error response: {response.text}")
else:
print(f"[DEBUG] Apollo.io /people/match returned status {response.status_code}")
try:
error_data = response.json()
print(f"[DEBUG] Apollo.io response: {error_data}")
except:
print(f"[DEBUG] Apollo.io response text: {response.text[:500]}")
# If match fails, try the new search endpoint (api_search)
print(f"[DEBUG] Attempting Apollo.io enrichment for {email} via /mixed_people/api_search endpoint")
search_response = await client.post(
f"{APOLLO_API_URL}/mixed_people/api_search",
headers={
"Content-Type": "application/json",
"Cache-Control": "no-cache",
"X-Api-Key": APOLLO_API_KEY
},
json={
"email": email.lower(),
"per_page": 1
},
timeout=10.0
)
print(f"[DEBUG] Apollo.io /mixed_people/api_search response status: {search_response.status_code}")
if search_response.status_code == 200:
search_data = search_response.json()
print(f"[DEBUG] Apollo.io /mixed_people/api_search response data keys: {list(search_data.keys())}")
people = search_data.get("people", [])
print(f"[DEBUG] Found {len(people)} people in search results")
if people:
person = people[0]
# Extract enriched data (same structure as above)
enriched_data = {
"first_name": person.get("first_name"),
"last_name": person.get("last_name"),
"title": person.get("title"),
"phone_number": person.get("phone_numbers", [{}])[0].get("raw_number") if person.get("phone_numbers") else None,
"linkedin_url": person.get("linkedin_url"),
"headline": person.get("headline"),
"organization_name": person.get("organization", {}).get("name") if person.get("organization") else None,
"organization_website": person.get("organization", {}).get("website_url") if person.get("organization") else None,
"organization_address": None,
}
if person.get("organization"):
org = person.get("organization", {})
address_parts = []
if org.get("street_address"):
address_parts.append(org.get("street_address"))
if org.get("city"):
address_parts.append(org.get("city"))
if org.get("state"):
address_parts.append(org.get("state"))
if org.get("postal_code"):
address_parts.append(org.get("postal_code"))
if org.get("country"):
address_parts.append(org.get("country"))
if address_parts:
enriched_data["organization_address"] = ", ".join(address_parts)
print(f"[INFO] Successfully enriched contact data for {email} from Apollo.io (via search)")
return enriched_data
else:
print(f"[DEBUG] Apollo.io /mixed_people/api_search returned 200 but no people in results")
elif search_response.status_code == 404:
print(f"[DEBUG] Apollo.io /mixed_people/api_search returned 404 - contact not found")
elif search_response.status_code == 401:
print(f"[ERROR] Apollo.io API authentication failed on search - check your API key")
try:
error_data = search_response.json()
print(f"[ERROR] Apollo.io search error details: {error_data}")
except:
print(f"[ERROR] Apollo.io search error response: {search_response.text}")
else:
print(f"[DEBUG] Apollo.io /mixed_people/api_search returned status {search_response.status_code}")
try:
error_data = search_response.json()
print(f"[DEBUG] Apollo.io search response: {error_data}")
except:
print(f"[DEBUG] Apollo.io search response text: {search_response.text[:500]}")
print(f"[INFO] No contact data found in Apollo.io for {email} - contact may not exist in Apollo's database")
return None
except httpx.HTTPStatusError as e:
print(f"[ERROR] Apollo API HTTP error during enrichment: {e.response.status_code} - {e.response.text}")
return None
except Exception as e:
print(f"[ERROR] Failed to enrich contact from Apollo.io: {str(e)}")
return None