LeadGenPro / lead_gen /scraper /google_maps.py
MaSTer-suFYan
feat: LeadGen Pro v2.0 β€” full system with bug fixes
beec01d
"""
scraper/google_maps.py β€” Google Maps scraper using Playwright.
Uses JavaScript evaluation to extract business data from the rendered
Google Maps DOM. Each listing card is a `.Nv2PK` container with child
elements for name, type, address, and rating.
"""
import re
import time
from typing import List, Optional
from models import Lead
from scraper.base_scraper import BaseScraper
from utils.logger import get_logger
from utils.helpers import clean_text, extract_phone, normalise_url
logger = get_logger(__name__)
class GoogleMapsScraper(BaseScraper):
"""Scrapes Google Maps business listings via Playwright."""
SOURCE_NAME = "Google Maps"
MAPS_BASE_URL = "https://www.google.com/maps/search/"
# JS snippet to extract listing data from the rendered DOM
_EXTRACT_JS = """
() => {
const results = [];
// Each listing card is a .Nv2PK container
const cards = document.querySelectorAll('.Nv2PK');
for (const card of cards) {
const nameEl = card.querySelector('.qBF1Pd, .fontHeadlineSmall');
if (!nameEl) continue;
const name = nameEl.innerText.trim();
if (!name) continue;
// Rating
const ratingEl = card.querySelector('.MW4etd');
const rating = ratingEl ? ratingEl.innerText.trim() : '';
// Review count
const reviewEl = card.querySelector('.UY7F9');
let reviewCount = 0;
if (reviewEl) {
const m = reviewEl.innerText.match(/[\\d,]+/);
if (m) reviewCount = parseInt(m[0].replace(',', ''));
}
// W4Efsd elements hold type, address, description, hours
const w4 = card.querySelectorAll('.W4Efsd');
let bizType = '';
let address = '';
let description = '';
if (w4.length >= 2) {
// w4[1] typically: "Type Β· β‚Ήβ‚Ήβ‚Ή Β· Address"
const parts = w4[1].innerText.split('Β·').map(s => s.trim());
if (parts.length >= 1) bizType = parts[0];
if (parts.length >= 2) {
// Last part is usually address
address = parts[parts.length - 1];
}
}
if (w4.length >= 4) {
description = w4[3].innerText.trim();
}
// Try to get phone from aria-label or href
const link = card.querySelector('a[href*="/maps/place/"]');
const ariaLabel = link ? (link.getAttribute('aria-label') || '') : '';
results.push({
name: name,
rating: rating,
reviewCount: reviewCount,
bizType: bizType,
address: address,
description: description,
ariaLabel: ariaLabel,
});
}
return results;
}
"""
def scrape(
self,
keyword : str,
location: str,
limit : int = 30,
category: str = "",
) -> List[Lead]:
"""Search Google Maps and extract business listings."""
try:
from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout
except ImportError:
logger.error("Playwright not installed.")
return []
city = location.split(",")[0].strip()
query = f"{keyword} in {city}"
url = f"{self.MAPS_BASE_URL}{query.replace(' ', '+')}"
logger.info(f"[Google Maps] Searching: {query!r}")
leads: List[Lead] = []
try:
with sync_playwright() as pw:
browser = pw.chromium.launch(
headless=True,
args=["--no-sandbox", "--disable-dev-shm-usage",
"--disable-blink-features=AutomationControlled"],
)
ctx = browser.new_context(
user_agent=self.session.headers["User-Agent"],
viewport={"width": 1280, "height": 900},
locale="en-IN",
)
page = ctx.new_page()
# Navigate with domcontentloaded (networkidle hangs on Maps)
page.goto(url, wait_until="domcontentloaded", timeout=45_000)
time.sleep(6)
# Dismiss consent dialogs
try:
page.click('[aria-label="Accept all"]', timeout=3000)
time.sleep(1)
except Exception:
pass
# Wait for results feed
try:
page.wait_for_selector('div[role="feed"]', timeout=15_000)
except PWTimeout:
logger.warning("[Google Maps] Feed not found, waiting longer...")
time.sleep(5)
# Scroll to load more results
scroll_count = max(4, limit // 4)
for _ in range(scroll_count):
page.evaluate("""
var el = document.querySelector('div[role="feed"]');
if (el) el.scrollBy(0, 3000);
""")
time.sleep(1.5)
# ── PHASE 1: Bulk extract from list view via JS ───────────
raw_data = page.evaluate(self._EXTRACT_JS)
logger.info(f"[Google Maps] JS extracted {len(raw_data)} listings.")
seen_names: set = set()
svc_cat = category if category else keyword
for item in raw_data:
if len(leads) >= limit:
break
name = item.get("name", "").strip()
if not name or name in seen_names:
continue
seen_names.add(name)
lead = Lead(
business_name=name,
service_category=svc_cat,
phone="",
email="",
address=item.get("address", ""),
website="",
source=self.SOURCE_NAME,
lead_source_scraper="google_maps",
notes=item.get("bizType", ""),
review_count=item.get("reviewCount", 0),
gmb_exists=True,
has_https=False,
is_mobile_friendly=True,
)
leads.append(lead)
self._scraped_count += 1
# ── PHASE 2: Click into each to get phone + website ───────
if leads:
logger.info(f"[Google Maps] Enriching {len(leads)} leads with contact details...")
links = page.query_selector_all('a[href*="/maps/place/"]')
enriched = 0
for link in links:
if enriched >= len(leads):
break
try:
aria = link.get_attribute("aria-label") or ""
# Find matching lead
matching = [l for l in leads if l.business_name == aria]
if not matching:
continue
link.click()
time.sleep(3)
# Extract phone
phone = self._extract_detail(page, [
'a[href^="tel:"]',
'button[data-tooltip="Copy phone number"]',
'[data-item-id*="phone"] .Io6YTe',
], attr="href", prefix="tel:")
# Extract website
website = self._extract_detail(page, [
'a[data-item-id="authority"]',
'a[data-tooltip="Open website"]',
], attr="href")
# Extract full address
address = self._extract_detail(page, [
'[data-item-id="address"] .Io6YTe',
'button[data-tooltip="Copy address"]',
], attr="text")
lead = matching[0]
if phone:
lead.phone = phone
if website and "google" not in website.lower():
lead.website = normalise_url(website)
lead.has_https = website.startswith("https")
if address:
lead.address = address
enriched += 1
# Go back
back = page.query_selector('button[aria-label="Back"]')
if back:
back.click()
time.sleep(2)
except Exception as exc:
logger.debug(f"Enrich error: {exc}")
# Try to go back anyway
try:
back = page.query_selector('button[aria-label="Back"]')
if back:
back.click()
time.sleep(1.5)
except Exception:
pass
logger.info(f"[Google Maps] Enriched {enriched}/{len(leads)} leads.")
browser.close()
except Exception as exc:
logger.error(f"[Google Maps] Fatal error: {exc}")
logger.info(f"[Google Maps] Extracted {len(leads)} leads for {query!r}")
return leads
def _extract_detail(self, page, selectors, attr="text", prefix=""):
"""Try multiple selectors to extract a detail value."""
for sel in selectors:
try:
el = page.query_selector(sel)
if el:
if attr == "href":
val = el.get_attribute("href") or ""
if prefix and prefix in val:
val = val.split(prefix)[-1].strip()
return val.strip()
elif attr == "text":
return clean_text(el.inner_text())
else:
return el.get_attribute(attr) or ""
except Exception:
pass
return ""