import gradio as gr import requests import json import pandas as pd import os from typing import Dict, List, Optional import time import random # ========================================== # APOLLO.IO API CLIENT CLASS # ========================================== class ApolloDataFetcher: """ Apollo.io API client for fetching people and company data """ def __init__(self, api_key: str = None): # Get API key from environment (Hugging Face secrets) or parameter self.api_key = api_key or os.getenv("APOLLO_API_KEY") self.base_url = "https://api.apollo.io/v1" self.headers = { "Content-Type": "application/json", "Cache-Control": "no-cache" } if self.api_key and self.api_key != "demo": self.headers["X-Api-Key"] = self.api_key # ========================================== # PEOPLE SEARCH METHOD # ========================================== def search_people(self, query: str, limit: int = 10) -> Dict: """Search for people using Apollo.io API""" if not self.api_key or self.api_key == "demo": return self._generate_demo_people_data(query, limit) endpoint = f"{self.base_url}/people/search" payload = { "q_keywords": query, "page": 1, "per_page": min(limit, 25), "person_locations": ["United States"], } try: response = requests.post(endpoint, headers=self.headers, json=payload, timeout=10) return self._handle_api_response(response) except requests.exceptions.Timeout: return {"error": "Request timeout. Please try again."} except Exception as e: return {"error": f"Request failed: {str(e)}"} # ========================================== # COMPANY SEARCH METHOD # ========================================== def search_companies(self, query: str, limit: int = 10) -> Dict: """Search for companies using Apollo.io API""" if not self.api_key or self.api_key == "demo": return self._generate_demo_company_data(query, limit) endpoint = f"{self.base_url}/organizations/search" payload = { "q_keywords": query, "page": 1, "per_page": min(limit, 25), "organization_locations": ["United States"], } try: response = requests.post(endpoint, headers=self.headers, json=payload, timeout=10) return self._handle_api_response(response) except requests.exceptions.Timeout: return {"error": "Request timeout. Please try again."} except Exception as e: return {"error": f"Request failed: {str(e)}"} # ========================================== # CONTACT ENRICHMENT METHOD # ========================================== def enrich_person(self, email: str) -> Dict: """Enrich person data by email""" if not self.api_key or self.api_key == "demo": return self._generate_demo_enriched_person(email) endpoint = f"{self.base_url}/people/match" payload = {"email": email} try: response = requests.post(endpoint, headers=self.headers, json=payload, timeout=10) return self._handle_api_response(response) except requests.exceptions.Timeout: return {"error": "Request timeout. Please try again."} except Exception as e: return {"error": f"Request failed: {str(e)}"} # ========================================== # EMAIL FINDER METHOD # ========================================== def find_email(self, first_name: str, last_name: str, domain: str) -> Dict: """Find email for a person""" if not self.api_key or self.api_key == "demo": return {"email": f"{first_name.lower()}.{last_name.lower()}@{domain}"} endpoint = f"{self.base_url}/email_accounts" payload = { "first_name": first_name, "last_name": last_name, "domain": domain } try: response = requests.post(endpoint, headers=self.headers, json=payload, timeout=10) return self._handle_api_response(response) except requests.exceptions.Timeout: return {"error": "Request timeout. Please try again."} except Exception as e: return {"error": f"Request failed: {str(e)}"} # ========================================== # API RESPONSE HANDLER # ========================================== def _handle_api_response(self, response) -> Dict: """Handle API response with proper error codes""" if response.status_code == 200: return response.json() elif response.status_code == 401: return {"error": "Invalid API key. Please check your Apollo.io API key."} elif response.status_code == 403: return {"error": "Access denied. Your API key may not have permission for this endpoint. Try upgrading your Apollo.io plan."} elif response.status_code == 429: return {"error": "Rate limit exceeded. Please try again later."} else: return {"error": f"API Error: {response.status_code} - {response.text}"} # ========================================== # DEMO DATA GENERATORS # ========================================== def _generate_demo_people_data(self, query: str, limit: int) -> Dict: """Generate realistic demo data for people search""" demo_people = [] job_titles = [ "Software Engineer", "Marketing Manager", "Sales Director", "CEO", "CTO", "VP Sales", "Product Manager", "Data Scientist", "UX Designer", "DevOps Engineer" ] companies = [ "TechCorp", "InnovateLtd", "DataSystems", "CloudNine", "AIStartup", "FinTechPro", "HealthTech", "EduSolutions", "GreenTech", "CyberSec" ] domains = [ "techcorp.com", "innovate.ltd", "datasys.com", "cloudnine.io", "aistartup.ai", "fintech.pro", "healthtech.com", "edusol.com", "greentech.co", "cybersec.net" ] first_names = ["John", "Jane", "Michael", "Sarah", "David", "Emily", "Robert", "Lisa", "James", "Maria"] last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia", "Miller", "Davis", "Rodriguez", "Martinez"] for i in range(min(limit, 10)): first_name = random.choice(first_names) last_name = random.choice(last_names) name = f"{first_name} {last_name}" company = random.choice(companies) domain = random.choice(domains) person = { "id": f"demo_{i+1}", "first_name": first_name, "last_name": last_name, "name": name, "title": random.choice(job_titles), "email": f"{first_name.lower()}.{last_name.lower()}@{domain}", "phone": f"+1 (555) {random.randint(100, 999)}-{random.randint(1000, 9999)}", "organization": { "name": company, "website_url": f"https://{domain}" }, "linkedin_url": f"https://linkedin.com/in/{first_name.lower()}-{last_name.lower()}", "city": random.choice(["San Francisco", "New York", "Austin", "Seattle", "Boston", "Los Angeles", "Chicago", "Miami"]), "state": random.choice(["CA", "NY", "TX", "WA", "MA", "IL", "FL"]), "country": "US" } demo_people.append(person) return { "people": demo_people, "pagination": {"page": 1, "per_page": limit, "total_entries": len(demo_people)} } def _generate_demo_company_data(self, query: str, limit: int) -> Dict: """Generate realistic demo data for company search""" demo_companies = [] industries = [ "Technology", "Healthcare", "Finance", "Education", "E-commerce", "Manufacturing", "Consulting", "Media", "Biotechnology", "Renewable Energy" ] company_suffixes = ["Corp", "Inc", "LLC", "Ltd", "Technologies", "Solutions", "Systems", "Labs", "Ventures", "Group"] for i in range(min(limit, 10)): if query and query.strip(): company_name = f"{query.title()} {random.choice(company_suffixes)}" else: company_name = f"Demo{random.choice(company_suffixes)} {i+1}" domain = f"{company_name.lower().replace(' ', '').replace('corp', '').replace('inc', '').replace('llc', '').replace('ltd', '')}.com" company = { "id": f"company_demo_{i+1}", "name": company_name, "website_url": f"https://{domain}", "industry": random.choice(industries), "employees_range": random.choice(["1-10", "11-50", "51-200", "201-500", "501-1000", "1000+", "2000+", "5000+"]), "estimated_num_employees": random.randint(10, 5000), "city": random.choice(["San Francisco", "New York", "Austin", "Seattle", "Boston", "Los Angeles", "Chicago", "Miami", "Denver", "Atlanta"]), "state": random.choice(["CA", "NY", "TX", "WA", "MA", "IL", "FL", "CO", "GA"]), "country": "US", "phone": f"+1 (555) {random.randint(100, 999)}-{random.randint(1000, 9999)}", "founded_year": random.randint(1990, 2023), "description": f"A leading {random.choice(industries).lower()} company focused on innovation and growth in the modern business landscape." } demo_companies.append(company) return { "organizations": demo_companies, "pagination": {"page": 1, "per_page": limit, "total_entries": len(demo_companies)} } def _generate_demo_enriched_person(self, email: str) -> Dict: """Generate demo enriched person data""" name_part = email.split('@')[0].replace('.', ' ').replace('_', ' ').title() domain = email.split('@')[1] first_name = name_part.split()[0] if name_part.split() else "John" last_name = name_part.split()[1] if len(name_part.split()) > 1 else "Doe" return { "person": { "id": "enriched_demo_1", "first_name": first_name, "last_name": last_name, "name": f"{first_name} {last_name}", "title": random.choice(["Senior Software Engineer", "Marketing Manager", "Sales Director", "Product Manager", "Data Scientist"]), "email": email, "phone": f"+1 (555) {random.randint(100, 999)}-{random.randint(1000, 9999)}", "organization": { "name": domain.split('.')[0].title() + " Corp", "website_url": f"https://{domain}" }, "linkedin_url": f"https://linkedin.com/in/{first_name.lower()}-{last_name.lower()}", "twitter_url": f"https://twitter.com/{first_name.lower()}{last_name.lower()}", "city": random.choice(["San Francisco", "New York", "Austin", "Seattle", "Boston"]), "state": random.choice(["CA", "NY", "TX", "WA", "MA"]), "country": "US", "employment_history": [ {"title": "Software Engineer", "organization_name": "Previous Corp", "start_date": "2020-01"}, {"title": "Junior Developer", "organization_name": "Startup Inc", "start_date": "2018-06"} ] } } # ========================================== # DATA FORMATTING FUNCTIONS # ========================================== def format_people_results(results: Dict) -> tuple: """Format people search results for display""" if "error" in results: return f"ā Error: {results['error']}", None # Handle both 'people' and 'data' keys (different API versions) people_data = results.get("people", results.get("data", [])) if not people_data: return "No results found.", None # Create formatted text output output_text = f"ā Found {len(people_data)} people:\n\n" # Create DataFrame for table table_data = [] for person in people_data: # Handle nested organization data org = person.get('organization', {}) or person.get('current_organization', {}) # Format text output name = person.get('name', f"{person.get('first_name', '')} {person.get('last_name', '')}").strip() output_text += f"š¤ **{name or 'N/A'}**\n" output_text += f" š§ Email: {person.get('email', 'N/A')}\n" output_text += f" š Phone: {person.get('phone', person.get('personal_phone', 'N/A'))}\n" output_text += f" š¼ Title: {person.get('title', 'N/A')}\n" output_text += f" š¢ Company: {org.get('name', 'N/A')}\n" output_text += f" š Location: {person.get('city', 'N/A')}, {person.get('state', 'N/A')}\n" output_text += f" š LinkedIn: {person.get('linkedin_url', 'N/A')}\n\n" # Add to table data table_data.append({ "Name": name or 'N/A', "Email": person.get('email', 'N/A'), "Phone": person.get('phone', person.get('personal_phone', 'N/A')), "Title": person.get('title', 'N/A'), "Company": org.get('name', 'N/A'), "Location": f"{person.get('city', 'N/A')}, {person.get('state', 'N/A')}" }) df = pd.DataFrame(table_data) return output_text, df def format_company_results(results: Dict) -> tuple: """Format company search results for display""" if "error" in results: return f"ā Error: {results['error']}", None # Handle both 'organizations' and 'data' keys companies_data = results.get("organizations", results.get("data", [])) if not companies_data: return "No results found.", None # Create formatted text output output_text = f"ā Found {len(companies_data)} companies:\n\n" # Create DataFrame for table table_data = [] for company in companies_data: # Format text output output_text += f"š¢ **{company.get('name', 'N/A')}**\n" output_text += f" š Website: {company.get('website_url', 'N/A')}\n" output_text += f" š Industry: {company.get('industry', company.get('primary_industry', 'N/A'))}\n" output_text += f" š„ Employees: {company.get('employees_range', company.get('estimated_num_employees', 'N/A'))}\n" output_text += f" š Phone: {company.get('phone', 'N/A')}\n" output_text += f" š Location: {company.get('city', 'N/A')}, {company.get('state', 'N/A')}\n" output_text += f" š Founded: {company.get('founded_year', 'N/A')}\n\n" # Add to table data table_data.append({ "Company": company.get('name', 'N/A'), "Website": company.get('website_url', 'N/A'), "Industry": company.get('industry', company.get('primary_industry', 'N/A')), "Employees": str(company.get('employees_range', company.get('estimated_num_employees', 'N/A'))), "Location": f"{company.get('city', 'N/A')}, {company.get('state', 'N/A')}", "Founded": str(company.get('founded_year', 'N/A')) }) df = pd.DataFrame(table_data) return output_text, df # ========================================== # INTERFACE FUNCTIONS # ========================================== def search_people_interface(query: str, limit: int): """Interface function for people search""" if not query.strip(): return "Please enter a search query.", None results = apollo_client.search_people(query, limit) return format_people_results(results) def search_companies_interface(query: str, limit: int): """Interface function for company search""" if not query.strip(): return "Please enter a search query.", None results = apollo_client.search_companies(query, limit) return format_company_results(results) def find_email_interface(first_name: str, last_name: str, domain: str): """Interface function for email finding""" if not all([first_name.strip(), last_name.strip(), domain.strip()]): return "Please enter first name, last name, and company domain." results = apollo_client.find_email(first_name, last_name, domain) if "error" in results: return f"ā Error: {results['error']}" if "email" in results: output = f"š§ **Email Found**\n\n" output += f"š¤ **Name:** {first_name} {last_name}\n" output += f"š§ **Email:** {results['email']}\n" output += f"š¢ **Domain:** {domain}\n" if results.get('confidence'): output += f"šÆ **Confidence:** {results['confidence']}\n" return output return "No email found for this person and domain combination." def enrich_person_interface(email: str): """Interface function for person enrichment""" if not email.strip() or "@" not in email: return "Please enter a valid email address." results = apollo_client.enrich_person(email) if "error" in results: return f"ā Error: {results['error']}" # Handle both 'person' and 'data' keys person = results.get("person", results.get("data")) if not person: return "No person data found for this email." output = f"š **Enriched Person Data**\n\n" # Handle nested organization data org = person.get('organization', {}) or person.get('current_organization', {}) name = person.get('name', f"{person.get('first_name', '')} {person.get('last_name', '')}").strip() output += f"š¤ **Name:** {name or 'N/A'}\n" output += f"š§ **Email:** {person.get('email', 'N/A')}\n" output += f"š **Phone:** {person.get('phone', person.get('personal_phone', 'N/A'))}\n" output += f"š¼ **Title:** {person.get('title', 'N/A')}\n" output += f"š¢ **Company:** {org.get('name', 'N/A')}\n" output += f"š **Company Website:** {org.get('website_url', 'N/A')}\n" output += f"š **Location:** {person.get('city', 'N/A')}, {person.get('state', 'N/A')}, {person.get('country', 'N/A')}\n" output += f"š **LinkedIn:** {person.get('linkedin_url', 'N/A')}\n" output += f"š¦ **Twitter:** {person.get('twitter_url', 'N/A')}\n" if person.get('employment_history'): output += f"\nš **Employment History:**\n" for job in person['employment_history']: output += f" ⢠{job.get('title', 'N/A')} at {job.get('organization_name', 'N/A')} (from {job.get('start_date', 'N/A')})\n" return output # ========================================== # INITIALIZE APOLLO CLIENT # ========================================== apollo_client = ApolloDataFetcher() # Check if API key is available api_key_status = "š Live API Mode" if apollo_client.api_key and apollo_client.api_key != "demo" else "š Demo Mode" # ========================================== # GRADIO INTERFACE # ========================================== with gr.Blocks(title="Apollo.io Data Fetcher", theme=gr.themes.Soft()) as demo: # ========================================== # HEADER SECTION # ========================================== gr.HTML(f"""
Search for people and companies, enrich contact data using Apollo.io's powerful database