Spaces:
Sleeping
Sleeping
| """ | |
| Apollo.io API service for creating contacts, enriching contact data, and adding them to sequences. | |
| Reference: | |
| - Create contact: https://docs.apollo.io/reference/create-a-contact | |
| - Add to sequence: https://docs.apollo.io/reference/add-contacts-to-sequence | |
| - Enrich person: https://docs.apollo.io/reference/enrich-people-data | |
| """ | |
| import os | |
| import httpx | |
| from typing import Optional, Dict, Any | |
| APOLLO_API_KEY = os.environ.get("APOLLO_API_KEY", "") | |
| APOLLO_API_URL = "https://api.apollo.io/api/v1" | |
| APOLLO_TRIAL_LIST_NAME = "VPR TRIAL LEADS" | |
| # Allow list ID to be set directly via environment variable (more reliable than lookup) | |
| APOLLO_TRIAL_LIST_ID = os.environ.get("APOLLO_TRIAL_LIST_ID", None) | |
| # Sequence ID for adding contacts to email sequences (preferred over lists) | |
| APOLLO_TRIAL_SEQUENCE_ID = os.environ.get("APOLLO_TRIAL_SEQUENCE_ID", None) | |
| async def get_list_id(list_name: Optional[str] = None) -> Optional[str]: | |
| """ | |
| Get Apollo list ID. First tries environment variable, then attempts API lookup. | |
| Args: | |
| list_name: Name of the list (for lookup if env var not set) | |
| Returns: | |
| List ID as string if found, None otherwise | |
| """ | |
| # First, try to use the list ID from environment variable (most reliable) | |
| if APOLLO_TRIAL_LIST_ID: | |
| # Apollo list IDs are typically hexadecimal strings (MongoDB ObjectIds) | |
| # Accept them as strings, just strip whitespace | |
| list_id = str(APOLLO_TRIAL_LIST_ID).strip() | |
| if list_id: | |
| print(f"[INFO] Using Apollo list ID from environment variable: {list_id}") | |
| return list_id | |
| else: | |
| print(f"[WARNING] APOLLO_TRIAL_LIST_ID is empty") | |
| # If no env var, try to look up by name (this may not work if API endpoint is different) | |
| if not list_name or not APOLLO_API_KEY: | |
| return None | |
| # Note: The /lists endpoint may not be available in all Apollo API versions | |
| # Try alternative: search for lists using a different endpoint | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| # Try the lists endpoint (may return 404 in some API versions) | |
| response = await client.get( | |
| f"{APOLLO_API_URL}/lists", | |
| headers={ | |
| "Content-Type": "application/json", | |
| "Cache-Control": "no-cache", | |
| "X-Api-Key": APOLLO_API_KEY | |
| }, | |
| timeout=10.0 | |
| ) | |
| if response.status_code == 200: | |
| data = response.json() | |
| lists = data.get("lists", []) | |
| for list_item in lists: | |
| if list_item.get("name") == list_name: | |
| list_id = list_item.get("id") | |
| print(f"[INFO] Found Apollo list '{list_name}' with ID: {list_id}") | |
| # Return as string (Apollo IDs are typically hex strings) | |
| return str(list_id) if list_id else None | |
| print(f"[WARNING] Apollo list '{list_name}' not found in available lists") | |
| else: | |
| print(f"[WARNING] Apollo lists endpoint returned {response.status_code}, cannot lookup list by name") | |
| except Exception as e: | |
| print(f"[WARNING] Failed to fetch Apollo list ID: {str(e)}") | |
| return None | |
| async def add_contact_to_sequence(contact_id: str, sequence_id: str) -> bool: | |
| """ | |
| Add a contact to an Apollo.io email sequence. | |
| Args: | |
| contact_id: The Apollo contact ID | |
| sequence_id: The Apollo sequence ID | |
| Returns: | |
| True if contact was successfully added to sequence, False otherwise | |
| """ | |
| if not APOLLO_API_KEY: | |
| print("[WARNING] APOLLO_API_KEY not set, skipping sequence enrollment") | |
| return False | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| response = await client.post( | |
| f"{APOLLO_API_URL}/sequence_contacts", | |
| headers={ | |
| "Content-Type": "application/json", | |
| "Cache-Control": "no-cache", | |
| "X-Api-Key": APOLLO_API_KEY | |
| }, | |
| json={ | |
| "sequence_id": sequence_id, | |
| "contact_id": contact_id | |
| }, | |
| timeout=10.0 | |
| ) | |
| if response.status_code in [200, 201]: | |
| print(f"[INFO] Successfully added contact {contact_id} to sequence {sequence_id}") | |
| return True | |
| else: | |
| error_data = response.text | |
| print(f"[ERROR] Failed to add contact to sequence: {response.status_code} - {error_data}") | |
| return False | |
| except httpx.HTTPStatusError as e: | |
| print(f"[ERROR] Apollo API HTTP error adding to sequence: {e.response.status_code} - {e.response.text}") | |
| return False | |
| except Exception as e: | |
| print(f"[ERROR] Failed to add contact to sequence: {str(e)}") | |
| return False | |
| async def create_apollo_contact( | |
| email: str, | |
| first_name: Optional[str] = None, | |
| last_name: Optional[str] = None, | |
| organization_name: Optional[str] = None, | |
| title: Optional[str] = None, | |
| list_name: Optional[str] = None, | |
| sequence_id: Optional[str] = None | |
| ) -> bool: | |
| """ | |
| Create a contact in Apollo.io and optionally add to a sequence or list. | |
| Args: | |
| email: Contact email address (required) | |
| first_name: Contact first name | |
| last_name: Contact last name | |
| organization_name: Organization name | |
| title: Job title | |
| list_name: Name of the list to add contact to (defaults to APOLLO_TRIAL_LIST_NAME) | |
| sequence_id: ID of the sequence to add contact to (preferred over list) | |
| Returns: | |
| True if contact created successfully, False otherwise | |
| Raises: | |
| ValueError: If APOLLO_API_KEY is not set | |
| """ | |
| if not APOLLO_API_KEY: | |
| print("[WARNING] APOLLO_API_KEY not set, skipping Apollo contact creation") | |
| return False | |
| # Use default list name if not provided | |
| if list_name is None: | |
| list_name = APOLLO_TRIAL_LIST_NAME | |
| # Parse name if full name is provided but first/last are not | |
| if not first_name and not last_name: | |
| # Try to extract from email or use email prefix | |
| email_prefix = email.split('@')[0] | |
| if '.' in email_prefix: | |
| parts = email_prefix.split('.') | |
| first_name = parts[0].capitalize() if parts else None | |
| last_name = parts[1].capitalize() if len(parts) > 1 else None | |
| else: | |
| first_name = email_prefix.capitalize() | |
| # Extract organization domain from email | |
| organization_domain = None | |
| if '@' in email: | |
| organization_domain = email.split('@')[1] | |
| # Prepare contact data | |
| contact_data: Dict[str, Any] = { | |
| "email": email.lower(), | |
| "run_dedupe": True # Prevent duplicate contacts | |
| } | |
| if first_name: | |
| contact_data["first_name"] = first_name | |
| if last_name: | |
| contact_data["last_name"] = last_name | |
| if organization_name: | |
| contact_data["organization_name"] = organization_name | |
| if organization_domain: | |
| contact_data["organization_domain"] = organization_domain | |
| if title: | |
| contact_data["title"] = title | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| # Get the list ID if list_name is provided | |
| list_ids = [] | |
| target_list_id = None # Store for later use | |
| if list_name: | |
| list_id = await get_list_id(list_name) | |
| if list_id: | |
| target_list_id = list_id # Store for verification later | |
| # Apollo API accepts list_ids as an array of strings (hex IDs) | |
| list_ids = [str(list_id)] | |
| contact_data["list_ids"] = list_ids | |
| print(f"[INFO] Adding contact to list ID: {list_id}") | |
| else: | |
| print(f"[WARNING] Could not find list '{list_name}'. Set APOLLO_TRIAL_LIST_ID environment variable with the list ID, or create contact without list assignment") | |
| # Log the payload being sent (for debugging) | |
| print(f"[DEBUG] Creating Apollo contact with payload: {contact_data}") | |
| # Create the contact | |
| response = await client.post( | |
| f"{APOLLO_API_URL}/contacts", | |
| headers={ | |
| "Content-Type": "application/json", | |
| "Cache-Control": "no-cache", | |
| "X-Api-Key": APOLLO_API_KEY | |
| }, | |
| json=contact_data, | |
| timeout=10.0 | |
| ) | |
| # Log the full response for debugging | |
| print(f"[DEBUG] Apollo API response status: {response.status_code}") | |
| try: | |
| response_json = response.json() | |
| print(f"[DEBUG] Apollo API response (full): {response_json}") | |
| except: | |
| print(f"[DEBUG] Apollo API response body (text): {response.text[:1000]}") # First 1000 chars | |
| if response.status_code == 200 or response.status_code == 201: | |
| result = response.json() | |
| contact = result.get("contact", {}) | |
| contact_id = contact.get("id") | |
| print(f"[INFO] Successfully created Apollo contact: {email} (ID: {contact_id})") | |
| # Priority: Add to sequence if sequence_id is provided (this is supported by API) | |
| target_sequence_id = sequence_id or APOLLO_TRIAL_SEQUENCE_ID | |
| if contact_id and target_sequence_id: | |
| print(f"[INFO] Adding contact to sequence: {target_sequence_id}") | |
| sequence_success = await add_contact_to_sequence(contact_id, target_sequence_id) | |
| if sequence_success: | |
| print(f"[INFO] ✓ Contact successfully enrolled in sequence") | |
| else: | |
| print(f"[WARNING] Failed to add contact to sequence, but contact was created") | |
| # Fallback: Try to add to list (API limitation - may not work) | |
| if list_ids and contact_id and target_list_id and not target_sequence_id: | |
| print(f"[INFO] Contact created with list_ids parameter: {list_ids}") | |
| print(f"[INFO] ⚠️ Apollo.io API Limitation: The API does not return list_ids in responses,") | |
| print(f"[INFO] so we cannot verify if the contact was added to the list via API.") | |
| print(f"[INFO] Please verify manually in Apollo.io that contact '{email}' is in list '{list_name or target_list_id}'") | |
| print(f"[INFO] Consider using sequences instead (APOLLO_TRIAL_SEQUENCE_ID) for better API support.") | |
| return True | |
| else: | |
| error_data = response.text | |
| print(f"[ERROR] Failed to create Apollo contact: {response.status_code} - {error_data}") | |
| return False | |
| except httpx.HTTPStatusError as e: | |
| print(f"[ERROR] Apollo API HTTP error: {e.response.status_code} - {e.response.text}") | |
| return False | |
| except Exception as e: | |
| print(f"[ERROR] Failed to create Apollo contact: {str(e)}") | |
| return False | |
| async def enrich_contact_by_email(email: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Enrich contact data from Apollo.io using email address. | |
| Args: | |
| email: Contact email address | |
| Returns: | |
| Dictionary with enriched contact data, or None if not found | |
| """ | |
| if not APOLLO_API_KEY: | |
| print("[WARNING] APOLLO_API_KEY not set, skipping Apollo enrichment") | |
| return None | |
| try: | |
| async with httpx.AsyncClient() as client: | |
| # Try people/match endpoint first (for exact email match) | |
| print(f"[DEBUG] Attempting Apollo.io enrichment for {email} via /people/match endpoint") | |
| response = await client.post( | |
| f"{APOLLO_API_URL}/people/match", | |
| headers={ | |
| "Content-Type": "application/json", | |
| "Cache-Control": "no-cache", | |
| "X-Api-Key": APOLLO_API_KEY | |
| }, | |
| json={ | |
| "email": email.lower() | |
| # Note: reveal_phone_number requires webhook_url, so we skip it for now | |
| }, | |
| timeout=10.0 | |
| ) | |
| print(f"[DEBUG] Apollo.io /people/match response status: {response.status_code}") | |
| if response.status_code == 200: | |
| data = response.json() | |
| print(f"[DEBUG] Apollo.io /people/match response data keys: {list(data.keys())}") | |
| person = data.get("person", {}) | |
| if person: | |
| print(f"[DEBUG] Found person data in Apollo.io response") | |
| # Extract enriched data | |
| enriched_data = { | |
| "first_name": person.get("first_name"), | |
| "last_name": person.get("last_name"), | |
| "title": person.get("title"), | |
| "phone_number": person.get("phone_numbers", [{}])[0].get("raw_number") if person.get("phone_numbers") else None, | |
| "linkedin_url": person.get("linkedin_url"), | |
| "headline": person.get("headline"), | |
| "organization_name": person.get("organization", {}).get("name") if person.get("organization") else None, | |
| "organization_website": person.get("organization", {}).get("website_url") if person.get("organization") else None, | |
| "organization_address": None, # May need to parse from organization data | |
| } | |
| # Try to get organization address | |
| if person.get("organization"): | |
| org = person.get("organization", {}) | |
| address_parts = [] | |
| if org.get("street_address"): | |
| address_parts.append(org.get("street_address")) | |
| if org.get("city"): | |
| address_parts.append(org.get("city")) | |
| if org.get("state"): | |
| address_parts.append(org.get("state")) | |
| if org.get("postal_code"): | |
| address_parts.append(org.get("postal_code")) | |
| if org.get("country"): | |
| address_parts.append(org.get("country")) | |
| if address_parts: | |
| enriched_data["organization_address"] = ", ".join(address_parts) | |
| print(f"[INFO] Successfully enriched contact data for {email} from Apollo.io") | |
| return enriched_data | |
| else: | |
| print(f"[DEBUG] Apollo.io /people/match returned 200 but no person data found") | |
| elif response.status_code == 404: | |
| print(f"[DEBUG] Apollo.io /people/match returned 404 - contact not found in database") | |
| elif response.status_code == 401: | |
| print(f"[ERROR] Apollo.io API authentication failed - check your API key") | |
| try: | |
| error_data = response.json() | |
| print(f"[ERROR] Apollo.io error details: {error_data}") | |
| except: | |
| print(f"[ERROR] Apollo.io error response: {response.text}") | |
| else: | |
| print(f"[DEBUG] Apollo.io /people/match returned status {response.status_code}") | |
| try: | |
| error_data = response.json() | |
| print(f"[DEBUG] Apollo.io response: {error_data}") | |
| except: | |
| print(f"[DEBUG] Apollo.io response text: {response.text[:500]}") | |
| # If match fails, try the new search endpoint (api_search) | |
| print(f"[DEBUG] Attempting Apollo.io enrichment for {email} via /mixed_people/api_search endpoint") | |
| search_response = await client.post( | |
| f"{APOLLO_API_URL}/mixed_people/api_search", | |
| headers={ | |
| "Content-Type": "application/json", | |
| "Cache-Control": "no-cache", | |
| "X-Api-Key": APOLLO_API_KEY | |
| }, | |
| json={ | |
| "email": email.lower(), | |
| "per_page": 1 | |
| }, | |
| timeout=10.0 | |
| ) | |
| print(f"[DEBUG] Apollo.io /mixed_people/api_search response status: {search_response.status_code}") | |
| if search_response.status_code == 200: | |
| search_data = search_response.json() | |
| print(f"[DEBUG] Apollo.io /mixed_people/api_search response data keys: {list(search_data.keys())}") | |
| people = search_data.get("people", []) | |
| print(f"[DEBUG] Found {len(people)} people in search results") | |
| if people: | |
| person = people[0] | |
| # Extract enriched data (same structure as above) | |
| enriched_data = { | |
| "first_name": person.get("first_name"), | |
| "last_name": person.get("last_name"), | |
| "title": person.get("title"), | |
| "phone_number": person.get("phone_numbers", [{}])[0].get("raw_number") if person.get("phone_numbers") else None, | |
| "linkedin_url": person.get("linkedin_url"), | |
| "headline": person.get("headline"), | |
| "organization_name": person.get("organization", {}).get("name") if person.get("organization") else None, | |
| "organization_website": person.get("organization", {}).get("website_url") if person.get("organization") else None, | |
| "organization_address": None, | |
| } | |
| if person.get("organization"): | |
| org = person.get("organization", {}) | |
| address_parts = [] | |
| if org.get("street_address"): | |
| address_parts.append(org.get("street_address")) | |
| if org.get("city"): | |
| address_parts.append(org.get("city")) | |
| if org.get("state"): | |
| address_parts.append(org.get("state")) | |
| if org.get("postal_code"): | |
| address_parts.append(org.get("postal_code")) | |
| if org.get("country"): | |
| address_parts.append(org.get("country")) | |
| if address_parts: | |
| enriched_data["organization_address"] = ", ".join(address_parts) | |
| print(f"[INFO] Successfully enriched contact data for {email} from Apollo.io (via search)") | |
| return enriched_data | |
| else: | |
| print(f"[DEBUG] Apollo.io /mixed_people/api_search returned 200 but no people in results") | |
| elif search_response.status_code == 404: | |
| print(f"[DEBUG] Apollo.io /mixed_people/api_search returned 404 - contact not found") | |
| elif search_response.status_code == 401: | |
| print(f"[ERROR] Apollo.io API authentication failed on search - check your API key") | |
| try: | |
| error_data = search_response.json() | |
| print(f"[ERROR] Apollo.io search error details: {error_data}") | |
| except: | |
| print(f"[ERROR] Apollo.io search error response: {search_response.text}") | |
| else: | |
| print(f"[DEBUG] Apollo.io /mixed_people/api_search returned status {search_response.status_code}") | |
| try: | |
| error_data = search_response.json() | |
| print(f"[DEBUG] Apollo.io search response: {error_data}") | |
| except: | |
| print(f"[DEBUG] Apollo.io search response text: {search_response.text[:500]}") | |
| print(f"[INFO] No contact data found in Apollo.io for {email} - contact may not exist in Apollo's database") | |
| return None | |
| except httpx.HTTPStatusError as e: | |
| print(f"[ERROR] Apollo API HTTP error during enrichment: {e.response.status_code} - {e.response.text}") | |
| return None | |
| except Exception as e: | |
| print(f"[ERROR] Failed to enrich contact from Apollo.io: {str(e)}") | |
| return None | |