Spaces:
Running
Running
| """ | |
| scraper/google_maps.py β Google Maps scraper using Playwright. | |
| Uses JavaScript evaluation to extract business data from the rendered | |
| Google Maps DOM. Each listing card is a `.Nv2PK` container with child | |
| elements for name, type, address, and rating. | |
| """ | |
| import re | |
| import time | |
| from typing import List, Optional | |
| from models import Lead | |
| from scraper.base_scraper import BaseScraper | |
| from utils.logger import get_logger | |
| from utils.helpers import clean_text, extract_phone, normalise_url | |
| logger = get_logger(__name__) | |
| class GoogleMapsScraper(BaseScraper): | |
| """Scrapes Google Maps business listings via Playwright.""" | |
| SOURCE_NAME = "Google Maps" | |
| MAPS_BASE_URL = "https://www.google.com/maps/search/" | |
| # JS snippet to extract listing data from the rendered DOM | |
| _EXTRACT_JS = """ | |
| () => { | |
| const results = []; | |
| // Each listing card is a .Nv2PK container | |
| const cards = document.querySelectorAll('.Nv2PK'); | |
| for (const card of cards) { | |
| const nameEl = card.querySelector('.qBF1Pd, .fontHeadlineSmall'); | |
| if (!nameEl) continue; | |
| const name = nameEl.innerText.trim(); | |
| if (!name) continue; | |
| // Rating | |
| const ratingEl = card.querySelector('.MW4etd'); | |
| const rating = ratingEl ? ratingEl.innerText.trim() : ''; | |
| // Review count | |
| const reviewEl = card.querySelector('.UY7F9'); | |
| let reviewCount = 0; | |
| if (reviewEl) { | |
| const m = reviewEl.innerText.match(/[\\d,]+/); | |
| if (m) reviewCount = parseInt(m[0].replace(',', '')); | |
| } | |
| // W4Efsd elements hold type, address, description, hours | |
| const w4 = card.querySelectorAll('.W4Efsd'); | |
| let bizType = ''; | |
| let address = ''; | |
| let description = ''; | |
| if (w4.length >= 2) { | |
| // w4[1] typically: "Type Β· βΉβΉβΉ Β· Address" | |
| const parts = w4[1].innerText.split('Β·').map(s => s.trim()); | |
| if (parts.length >= 1) bizType = parts[0]; | |
| if (parts.length >= 2) { | |
| // Last part is usually address | |
| address = parts[parts.length - 1]; | |
| } | |
| } | |
| if (w4.length >= 4) { | |
| description = w4[3].innerText.trim(); | |
| } | |
| // Try to get phone from aria-label or href | |
| const link = card.querySelector('a[href*="/maps/place/"]'); | |
| const ariaLabel = link ? (link.getAttribute('aria-label') || '') : ''; | |
| results.push({ | |
| name: name, | |
| rating: rating, | |
| reviewCount: reviewCount, | |
| bizType: bizType, | |
| address: address, | |
| description: description, | |
| ariaLabel: ariaLabel, | |
| }); | |
| } | |
| return results; | |
| } | |
| """ | |
| def scrape( | |
| self, | |
| keyword : str, | |
| location: str, | |
| limit : int = 30, | |
| category: str = "", | |
| ) -> List[Lead]: | |
| """Search Google Maps and extract business listings.""" | |
| try: | |
| from playwright.sync_api import sync_playwright, TimeoutError as PWTimeout | |
| except ImportError: | |
| logger.error("Playwright not installed.") | |
| return [] | |
| city = location.split(",")[0].strip() | |
| query = f"{keyword} in {city}" | |
| url = f"{self.MAPS_BASE_URL}{query.replace(' ', '+')}" | |
| logger.info(f"[Google Maps] Searching: {query!r}") | |
| leads: List[Lead] = [] | |
| try: | |
| with sync_playwright() as pw: | |
| browser = pw.chromium.launch( | |
| headless=True, | |
| args=["--no-sandbox", "--disable-dev-shm-usage", | |
| "--disable-blink-features=AutomationControlled"], | |
| ) | |
| ctx = browser.new_context( | |
| user_agent=self.session.headers["User-Agent"], | |
| viewport={"width": 1280, "height": 900}, | |
| locale="en-IN", | |
| ) | |
| page = ctx.new_page() | |
| # Navigate with domcontentloaded (networkidle hangs on Maps) | |
| page.goto(url, wait_until="domcontentloaded", timeout=45_000) | |
| time.sleep(6) | |
| # Dismiss consent dialogs | |
| try: | |
| page.click('[aria-label="Accept all"]', timeout=3000) | |
| time.sleep(1) | |
| except Exception: | |
| pass | |
| # Wait for results feed | |
| try: | |
| page.wait_for_selector('div[role="feed"]', timeout=15_000) | |
| except PWTimeout: | |
| logger.warning("[Google Maps] Feed not found, waiting longer...") | |
| time.sleep(5) | |
| # Scroll to load more results | |
| scroll_count = max(4, limit // 4) | |
| for _ in range(scroll_count): | |
| page.evaluate(""" | |
| var el = document.querySelector('div[role="feed"]'); | |
| if (el) el.scrollBy(0, 3000); | |
| """) | |
| time.sleep(1.5) | |
| # ββ PHASE 1: Bulk extract from list view via JS βββββββββββ | |
| raw_data = page.evaluate(self._EXTRACT_JS) | |
| logger.info(f"[Google Maps] JS extracted {len(raw_data)} listings.") | |
| seen_names: set = set() | |
| svc_cat = category if category else keyword | |
| for item in raw_data: | |
| if len(leads) >= limit: | |
| break | |
| name = item.get("name", "").strip() | |
| if not name or name in seen_names: | |
| continue | |
| seen_names.add(name) | |
| lead = Lead( | |
| business_name=name, | |
| service_category=svc_cat, | |
| phone="", | |
| email="", | |
| address=item.get("address", ""), | |
| website="", | |
| source=self.SOURCE_NAME, | |
| lead_source_scraper="google_maps", | |
| notes=item.get("bizType", ""), | |
| review_count=item.get("reviewCount", 0), | |
| gmb_exists=True, | |
| has_https=False, | |
| is_mobile_friendly=True, | |
| ) | |
| leads.append(lead) | |
| self._scraped_count += 1 | |
| # ββ PHASE 2: Click into each to get phone + website βββββββ | |
| if leads: | |
| logger.info(f"[Google Maps] Enriching {len(leads)} leads with contact details...") | |
| links = page.query_selector_all('a[href*="/maps/place/"]') | |
| enriched = 0 | |
| for link in links: | |
| if enriched >= len(leads): | |
| break | |
| try: | |
| aria = link.get_attribute("aria-label") or "" | |
| # Find matching lead | |
| matching = [l for l in leads if l.business_name == aria] | |
| if not matching: | |
| continue | |
| link.click() | |
| time.sleep(3) | |
| # Extract phone | |
| phone = self._extract_detail(page, [ | |
| 'a[href^="tel:"]', | |
| 'button[data-tooltip="Copy phone number"]', | |
| '[data-item-id*="phone"] .Io6YTe', | |
| ], attr="href", prefix="tel:") | |
| # Extract website | |
| website = self._extract_detail(page, [ | |
| 'a[data-item-id="authority"]', | |
| 'a[data-tooltip="Open website"]', | |
| ], attr="href") | |
| # Extract full address | |
| address = self._extract_detail(page, [ | |
| '[data-item-id="address"] .Io6YTe', | |
| 'button[data-tooltip="Copy address"]', | |
| ], attr="text") | |
| lead = matching[0] | |
| if phone: | |
| lead.phone = phone | |
| if website and "google" not in website.lower(): | |
| lead.website = normalise_url(website) | |
| lead.has_https = website.startswith("https") | |
| if address: | |
| lead.address = address | |
| enriched += 1 | |
| # Go back | |
| back = page.query_selector('button[aria-label="Back"]') | |
| if back: | |
| back.click() | |
| time.sleep(2) | |
| except Exception as exc: | |
| logger.debug(f"Enrich error: {exc}") | |
| # Try to go back anyway | |
| try: | |
| back = page.query_selector('button[aria-label="Back"]') | |
| if back: | |
| back.click() | |
| time.sleep(1.5) | |
| except Exception: | |
| pass | |
| logger.info(f"[Google Maps] Enriched {enriched}/{len(leads)} leads.") | |
| browser.close() | |
| except Exception as exc: | |
| logger.error(f"[Google Maps] Fatal error: {exc}") | |
| logger.info(f"[Google Maps] Extracted {len(leads)} leads for {query!r}") | |
| return leads | |
| def _extract_detail(self, page, selectors, attr="text", prefix=""): | |
| """Try multiple selectors to extract a detail value.""" | |
| for sel in selectors: | |
| try: | |
| el = page.query_selector(sel) | |
| if el: | |
| if attr == "href": | |
| val = el.get_attribute("href") or "" | |
| if prefix and prefix in val: | |
| val = val.split(prefix)[-1].strip() | |
| return val.strip() | |
| elif attr == "text": | |
| return clean_text(el.inner_text()) | |
| else: | |
| return el.get_attribute(attr) or "" | |
| except Exception: | |
| pass | |
| return "" | |