Spaces:
Running
Running
| """ | |
| scraper/email_finder.py β Discover email addresses for business leads. | |
| Three-stage approach: | |
| 1. Scrape common pages (/contact, /about, /team) for mailto links & text emails. | |
| 2. Try common prefix patterns (info@, contact@, hello@) on the domain. | |
| 3. Validate found emails via DNS MX record check. | |
| """ | |
| import re | |
| import socket | |
| from typing import List, Optional, Set | |
| from urllib.parse import urljoin | |
| try: | |
| import dns.resolver | |
| HAS_DNS = True | |
| except ImportError: | |
| HAS_DNS = False | |
| try: | |
| from bs4 import BeautifulSoup | |
| HAS_BS4 = True | |
| except ImportError: | |
| HAS_BS4 = False | |
| import requests | |
| import config | |
| from models import Lead | |
| from utils.logger import get_logger | |
| from utils.helpers import domain_from_url, normalise_url | |
| logger = get_logger(__name__) | |
| # Regex for email extraction | |
| _EMAIL_RE = re.compile( | |
| r'[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}', | |
| re.IGNORECASE, | |
| ) | |
| # Junk emails to ignore | |
| _JUNK_DOMAINS = { | |
| "example.com", "sentry.io", "wixpress.com", "wordpress.com", | |
| "googleapis.com", "googleusercontent.com", "w3.org", | |
| "schema.org", "facebook.com", "twitter.com", "instagram.com", | |
| } | |
| _JUNK_PREFIXES = { | |
| "noreply", "no-reply", "mailer-daemon", "postmaster", | |
| "donotreply", "do-not-reply", | |
| } | |
| class EmailFinder: | |
| """ | |
| Discovers and validates email addresses for business leads. | |
| Usage:: | |
| finder = EmailFinder() | |
| finder.find_email(lead) | |
| # lead.email is now populated if found | |
| """ | |
| def __init__(self, timeout: int = 10): | |
| self.timeout = timeout | |
| self.session = requests.Session() | |
| self.session.headers.update({ | |
| "User-Agent": config.USER_AGENTS[0], | |
| "Accept": "text/html,application/xhtml+xml", | |
| }) | |
| self._mx_cache: dict = {} | |
| def find_email(self, lead: Lead) -> Optional[str]: | |
| """ | |
| Attempt to find an email for the given lead. | |
| Updates lead.email in-place if found. | |
| Returns the email or None. | |
| """ | |
| if lead.email: | |
| # Already has an email β just validate it | |
| if self._validate_email(lead.email): | |
| return lead.email | |
| else: | |
| lead.email = "" | |
| if not lead.website: | |
| return None | |
| domain = domain_from_url(lead.website) | |
| if not domain: | |
| return None | |
| found_emails: Set[str] = set() | |
| # ββ Stage 1: Scrape contact/about pages βββββββββββββββββββββββββββ | |
| base_url = normalise_url(lead.website) | |
| for path in config.EMAIL_CONTACT_PATHS: | |
| page_url = urljoin(base_url, path) | |
| emails = self._scrape_page_for_emails(page_url, domain) | |
| found_emails.update(emails) | |
| if found_emails: | |
| break # Found emails, no need to check more pages | |
| # Also check homepage if nothing found | |
| if not found_emails: | |
| emails = self._scrape_page_for_emails(base_url, domain) | |
| found_emails.update(emails) | |
| # ββ Stage 2: Try common patterns ββββββββββββββββββββββββββββββββββ | |
| if not found_emails: | |
| for prefix in config.EMAIL_COMMON_PREFIXES: | |
| candidate = f"{prefix}@{domain}" | |
| if self._validate_email(candidate): | |
| found_emails.add(candidate) | |
| break # First valid pattern is enough | |
| # ββ Stage 3: Pick the best email ββββββββββββββββββββββββββββββββββ | |
| if found_emails: | |
| best = self._pick_best_email(found_emails) | |
| lead.email = best | |
| logger.debug(f"Email found for {lead.business_name}: {best}") | |
| return best | |
| logger.debug(f"No email found for {lead.business_name}") | |
| return None | |
| def find_emails_bulk(self, leads: List[Lead]) -> int: | |
| """Find emails for multiple leads. Returns count of emails found.""" | |
| count = 0 | |
| for lead in leads: | |
| if not lead.email and lead.website: | |
| result = self.find_email(lead) | |
| if result: | |
| count += 1 | |
| logger.info(f"Email finder: found {count} emails for {len(leads)} leads") | |
| return count | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Internal methods | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _scrape_page_for_emails( | |
| self, url: str, expected_domain: str | |
| ) -> Set[str]: | |
| """Fetch a page and extract email addresses matching the domain.""" | |
| emails = set() | |
| try: | |
| resp = self.session.get(url, timeout=self.timeout, allow_redirects=True) | |
| if resp.status_code != 200: | |
| return emails | |
| html = resp.text | |
| # Method 1: BeautifulSoup mailto links | |
| if HAS_BS4: | |
| soup = BeautifulSoup(html, "html.parser") | |
| for link in soup.find_all("a", href=True): | |
| href = link["href"] | |
| if href.startswith("mailto:"): | |
| email = href.replace("mailto:", "").split("?")[0].strip() | |
| if self._is_valid_format(email): | |
| emails.add(email.lower()) | |
| # Method 2: Regex on raw HTML | |
| for match in _EMAIL_RE.findall(html): | |
| email = match.lower().strip() | |
| if self._is_valid_format(email): | |
| emails.add(email) | |
| except Exception as e: | |
| logger.debug(f"Email scrape error for {url}: {e}") | |
| # Filter: prefer emails on the expected domain | |
| domain_emails = {e for e in emails if expected_domain in e} | |
| if domain_emails: | |
| return domain_emails | |
| # Filter out junk | |
| return {e for e in emails if not self._is_junk_email(e)} | |
| def _validate_email(self, email: str) -> bool: | |
| """Validate email format + optionally check MX record.""" | |
| if not self._is_valid_format(email): | |
| return False | |
| if self._is_junk_email(email): | |
| return False | |
| # DNS MX record check | |
| domain = email.split("@")[1] | |
| return self._check_mx(domain) | |
| def _check_mx(self, domain: str) -> bool: | |
| """Check if domain has valid MX records (cached).""" | |
| if domain in self._mx_cache: | |
| return self._mx_cache[domain] | |
| if not HAS_DNS: | |
| # Without dnspython, assume valid | |
| self._mx_cache[domain] = True | |
| return True | |
| try: | |
| answers = dns.resolver.resolve(domain, "MX", lifetime=5) | |
| has_mx = len(answers) > 0 | |
| self._mx_cache[domain] = has_mx | |
| return has_mx | |
| except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, | |
| dns.resolver.NoNameservers, dns.exception.Timeout): | |
| self._mx_cache[domain] = False | |
| return False | |
| except Exception: | |
| self._mx_cache[domain] = True # Assume valid on unexpected error | |
| return True | |
| def _is_valid_format(email: str) -> bool: | |
| """Basic format validation.""" | |
| if not email or "@" not in email: | |
| return False | |
| parts = email.split("@") | |
| if len(parts) != 2: | |
| return False | |
| local, domain = parts | |
| if not local or not domain or "." not in domain: | |
| return False | |
| if len(email) > 254 or len(local) > 64: | |
| return False | |
| return True | |
| def _is_junk_email(email: str) -> bool: | |
| """Check if email is a junk/system address.""" | |
| local, domain = email.split("@", 1) | |
| if domain.lower() in _JUNK_DOMAINS: | |
| return True | |
| if local.lower() in _JUNK_PREFIXES: | |
| return True | |
| # Image/file extensions in domain | |
| if domain.endswith((".png", ".jpg", ".gif", ".svg", ".css", ".js")): | |
| return True | |
| return False | |
| def _pick_best_email(emails: Set[str]) -> str: | |
| """Pick the most likely business email from a set.""" | |
| # Priority order for prefixes | |
| priority = ["info", "contact", "hello", "sales", "enquiry", | |
| "admin", "office", "support"] | |
| for prefix in priority: | |
| for email in emails: | |
| if email.split("@")[0].lower() == prefix: | |
| return email | |
| # Return shortest (usually most generic = most useful) | |
| return min(emails, key=len) | |