| import json |
| import time |
| import argparse |
| from playwright.sync_api import sync_playwright |
|
|
| def scrape_google_maps(query, max_results=50): |
| """ |
| Scrapes Google Maps for a specific query and returns a list of businesses. |
| Scrolls the sidebar feed panel to load results up to max_results. |
| """ |
| leads = [] |
|
|
| with sync_playwright() as p: |
| print(f"[*] Launching browser...") |
| browser = p.chromium.launch(headless=True) |
| context = browser.new_context( |
| user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", |
| locale="en-US", |
| viewport={"width": 1280, "height": 900}, |
| ) |
| page = context.new_page() |
|
|
| |
| try: |
| from playwright_stealth import stealth |
| if hasattr(stealth, 'stealth'): |
| stealth.stealth(page) |
| else: |
| stealth(page) |
| print("[+] Stealth applied.") |
| except Exception as e: |
| print(f"[!] Stealth not applied: {e}") |
|
|
| print(f"[*] Searching: '{query}'") |
| search_url = f"https://www.google.com/maps/search/{query.replace(' ', '+')}" |
|
|
| try: |
| page.goto(search_url, timeout=60000) |
| except Exception as e: |
| print(f"[!] Page load failed: {e}") |
| browser.close() |
| return [] |
|
|
| |
| try: |
| print("[*] Waiting for results feed...") |
| page.wait_for_selector('div[role="feed"]', timeout=25000) |
| print("[+] Results feed found.") |
| except Exception as e: |
| print(f"[!] Results feed not found: {e}") |
| browser.close() |
| return [] |
|
|
| |
| |
| feed_selector = 'div[role="feed"]' |
| stale_scrolls = 0 |
| last_count = 0 |
|
|
| print(f"[*] Scrolling feed to collect up to {max_results} results...") |
| while stale_scrolls < 15: |
| |
| try: |
| page.evaluate( |
| """() => { |
| const feed = document.querySelector('div[role="feed"]'); |
| if (feed) feed.scrollBy(0, 2500); |
| }""" |
| ) |
| except Exception: |
| page.mouse.wheel(0, 3000) |
|
|
| time.sleep(2.5) |
|
|
| items = page.locator('div[role="feed"] div[role="article"]').all() |
| current_count = len(items) |
| print(f" Items visible: {current_count}") |
|
|
| if current_count >= max_results: |
| print(f"[+] Reached target count ({max_results}). Stopping scroll.") |
| break |
|
|
| if current_count == last_count: |
| stale_scrolls += 1 |
| print(f" No new items (stale {stale_scrolls}/15)...") |
| else: |
| stale_scrolls = 0 |
|
|
| last_count = current_count |
|
|
| |
| try: |
| end_text = page.locator("text=You've reached the end of the list").count() |
| if end_text > 0: |
| print("[+] End of list reached.") |
| break |
| except Exception: |
| pass |
|
|
| |
| items = page.locator('div[role="feed"] div[role="article"]').all() |
| final_items = items[:max_results] |
| print(f"[*] Extracting data from {len(final_items)} items...") |
|
|
| for i, item in enumerate(final_items): |
| try: |
| |
| name = item.get_attribute("aria-label") or "" |
| if not name: |
| nl = item.locator('div.qBF1Pd') |
| if nl.count() > 0: |
| name = nl.first.inner_text() |
| if not name: |
| continue |
|
|
| print(f" [{i+1}/{len(final_items)}] {name}") |
|
|
| |
| website = "" |
| for sel in ['a.lcr4fd', 'a[data-value="Website"]', 'a[href^="http"]:not([href*="google"])']: |
| wl = item.locator(sel) |
| if wl.count() > 0: |
| website = wl.first.get_attribute("href") or "" |
| if website: |
| break |
|
|
| |
| phone = "" |
| for sel in ['span.Us7fWe', 'span.UsdlK', 'button[data-item-id*="phone"] div.fontBodyMedium']: |
| pl = item.locator(sel) |
| if pl.count() > 0: |
| phone = pl.first.inner_text().strip() |
| if phone: |
| break |
|
|
| |
| rating = "" |
| rl = item.locator('span.MW4etd') |
| if rl.count() > 0: |
| rating = rl.first.inner_text().strip() |
| else: |
| rl2 = item.locator('span[role="img"][aria-label*="stars"]') |
| if rl2.count() > 0: |
| aria = rl2.first.get_attribute("aria-label") or "" |
| rating = aria.split(" ")[0] |
|
|
| lead = { |
| "name": name, |
| "website": website, |
| "phone": phone, |
| "rating": rating, |
| "search_query": query, |
| } |
|
|
| if lead not in leads: |
| leads.append(lead) |
|
|
| except Exception as e: |
| print(f"[!] Error on item {i}: {e}") |
|
|
| browser.close() |
| print(f"[+] Done. Collected {len(leads)} unique leads.") |
|
|
| return leads |
|
|
|
|
| if __name__ == "__main__": |
| parser = argparse.ArgumentParser(description='Google Maps Scraper') |
| parser.add_argument('--niche', required=True) |
| parser.add_argument('--location', required=True) |
| parser.add_argument('--limit', type=int, default=10) |
| args = parser.parse_args() |
|
|
| full_query = f"{args.niche} in {args.location}" |
| results = scrape_google_maps(full_query, args.limit) |
|
|
| import os |
| os.makedirs(".tmp", exist_ok=True) |
| with open(".tmp/raw_leads.json", "w") as f: |
| json.dump(results, f, indent=4) |
| print(f"[+] Saved {len(results)} leads to .tmp/raw_leads.json") |
|
|