LeadGenPro / lead_gen /scraper /email_finder.py
MaSTer-suFYan
feat: LeadGen Pro v2.0 β€” full system with bug fixes
beec01d
"""
scraper/email_finder.py β€” Discover email addresses for business leads.
Three-stage approach:
1. Scrape common pages (/contact, /about, /team) for mailto links & text emails.
2. Try common prefix patterns (info@, contact@, hello@) on the domain.
3. Validate found emails via DNS MX record check.
"""
import re
import socket
from typing import List, Optional, Set
from urllib.parse import urljoin
try:
import dns.resolver
HAS_DNS = True
except ImportError:
HAS_DNS = False
try:
from bs4 import BeautifulSoup
HAS_BS4 = True
except ImportError:
HAS_BS4 = False
import requests
import config
from models import Lead
from utils.logger import get_logger
from utils.helpers import domain_from_url, normalise_url
logger = get_logger(__name__)
# Regex for email extraction
_EMAIL_RE = re.compile(
r'[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}',
re.IGNORECASE,
)
# Junk emails to ignore
_JUNK_DOMAINS = {
"example.com", "sentry.io", "wixpress.com", "wordpress.com",
"googleapis.com", "googleusercontent.com", "w3.org",
"schema.org", "facebook.com", "twitter.com", "instagram.com",
}
_JUNK_PREFIXES = {
"noreply", "no-reply", "mailer-daemon", "postmaster",
"donotreply", "do-not-reply",
}
class EmailFinder:
"""
Discovers and validates email addresses for business leads.
Usage::
finder = EmailFinder()
finder.find_email(lead)
# lead.email is now populated if found
"""
def __init__(self, timeout: int = 10):
self.timeout = timeout
self.session = requests.Session()
self.session.headers.update({
"User-Agent": config.USER_AGENTS[0],
"Accept": "text/html,application/xhtml+xml",
})
self._mx_cache: dict = {}
def find_email(self, lead: Lead) -> Optional[str]:
"""
Attempt to find an email for the given lead.
Updates lead.email in-place if found.
Returns the email or None.
"""
if lead.email:
# Already has an email β€” just validate it
if self._validate_email(lead.email):
return lead.email
else:
lead.email = ""
if not lead.website:
return None
domain = domain_from_url(lead.website)
if not domain:
return None
found_emails: Set[str] = set()
# ── Stage 1: Scrape contact/about pages ───────────────────────────
base_url = normalise_url(lead.website)
for path in config.EMAIL_CONTACT_PATHS:
page_url = urljoin(base_url, path)
emails = self._scrape_page_for_emails(page_url, domain)
found_emails.update(emails)
if found_emails:
break # Found emails, no need to check more pages
# Also check homepage if nothing found
if not found_emails:
emails = self._scrape_page_for_emails(base_url, domain)
found_emails.update(emails)
# ── Stage 2: Try common patterns ──────────────────────────────────
if not found_emails:
for prefix in config.EMAIL_COMMON_PREFIXES:
candidate = f"{prefix}@{domain}"
if self._validate_email(candidate):
found_emails.add(candidate)
break # First valid pattern is enough
# ── Stage 3: Pick the best email ──────────────────────────────────
if found_emails:
best = self._pick_best_email(found_emails)
lead.email = best
logger.debug(f"Email found for {lead.business_name}: {best}")
return best
logger.debug(f"No email found for {lead.business_name}")
return None
def find_emails_bulk(self, leads: List[Lead]) -> int:
"""Find emails for multiple leads. Returns count of emails found."""
count = 0
for lead in leads:
if not lead.email and lead.website:
result = self.find_email(lead)
if result:
count += 1
logger.info(f"Email finder: found {count} emails for {len(leads)} leads")
return count
# ══════════════════════════════════════════════════════════════════════
# Internal methods
# ══════════════════════════════════════════════════════════════════════
def _scrape_page_for_emails(
self, url: str, expected_domain: str
) -> Set[str]:
"""Fetch a page and extract email addresses matching the domain."""
emails = set()
try:
resp = self.session.get(url, timeout=self.timeout, allow_redirects=True)
if resp.status_code != 200:
return emails
html = resp.text
# Method 1: BeautifulSoup mailto links
if HAS_BS4:
soup = BeautifulSoup(html, "html.parser")
for link in soup.find_all("a", href=True):
href = link["href"]
if href.startswith("mailto:"):
email = href.replace("mailto:", "").split("?")[0].strip()
if self._is_valid_format(email):
emails.add(email.lower())
# Method 2: Regex on raw HTML
for match in _EMAIL_RE.findall(html):
email = match.lower().strip()
if self._is_valid_format(email):
emails.add(email)
except Exception as e:
logger.debug(f"Email scrape error for {url}: {e}")
# Filter: prefer emails on the expected domain
domain_emails = {e for e in emails if expected_domain in e}
if domain_emails:
return domain_emails
# Filter out junk
return {e for e in emails if not self._is_junk_email(e)}
def _validate_email(self, email: str) -> bool:
"""Validate email format + optionally check MX record."""
if not self._is_valid_format(email):
return False
if self._is_junk_email(email):
return False
# DNS MX record check
domain = email.split("@")[1]
return self._check_mx(domain)
def _check_mx(self, domain: str) -> bool:
"""Check if domain has valid MX records (cached)."""
if domain in self._mx_cache:
return self._mx_cache[domain]
if not HAS_DNS:
# Without dnspython, assume valid
self._mx_cache[domain] = True
return True
try:
answers = dns.resolver.resolve(domain, "MX", lifetime=5)
has_mx = len(answers) > 0
self._mx_cache[domain] = has_mx
return has_mx
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer,
dns.resolver.NoNameservers, dns.exception.Timeout):
self._mx_cache[domain] = False
return False
except Exception:
self._mx_cache[domain] = True # Assume valid on unexpected error
return True
@staticmethod
def _is_valid_format(email: str) -> bool:
"""Basic format validation."""
if not email or "@" not in email:
return False
parts = email.split("@")
if len(parts) != 2:
return False
local, domain = parts
if not local or not domain or "." not in domain:
return False
if len(email) > 254 or len(local) > 64:
return False
return True
@staticmethod
def _is_junk_email(email: str) -> bool:
"""Check if email is a junk/system address."""
local, domain = email.split("@", 1)
if domain.lower() in _JUNK_DOMAINS:
return True
if local.lower() in _JUNK_PREFIXES:
return True
# Image/file extensions in domain
if domain.endswith((".png", ".jpg", ".gif", ".svg", ".css", ".js")):
return True
return False
@staticmethod
def _pick_best_email(emails: Set[str]) -> str:
"""Pick the most likely business email from a set."""
# Priority order for prefixes
priority = ["info", "contact", "hello", "sales", "enquiry",
"admin", "office", "support"]
for prefix in priority:
for email in emails:
if email.split("@")[0].lower() == prefix:
return email
# Return shortest (usually most generic = most useful)
return min(emails, key=len)