import json import time import argparse from playwright.sync_api import sync_playwright def scrape_google_maps(query, max_results=50): """ Scrapes Google Maps for a specific query and returns a list of businesses. Scrolls the sidebar feed panel to load results up to max_results. """ leads = [] with sync_playwright() as p: print(f"[*] Launching browser...") browser = p.chromium.launch(headless=True) context = browser.new_context( user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", locale="en-US", viewport={"width": 1280, "height": 900}, ) page = context.new_page() # Apply stealth if available try: from playwright_stealth import stealth if hasattr(stealth, 'stealth'): stealth.stealth(page) else: stealth(page) print("[+] Stealth applied.") except Exception as e: print(f"[!] Stealth not applied: {e}") print(f"[*] Searching: '{query}'") search_url = f"https://www.google.com/maps/search/{query.replace(' ', '+')}" try: page.goto(search_url, timeout=60000) except Exception as e: print(f"[!] Page load failed: {e}") browser.close() return [] # Wait for the results feed to appear try: print("[*] Waiting for results feed...") page.wait_for_selector('div[role="feed"]', timeout=25000) print("[+] Results feed found.") except Exception as e: print(f"[!] Results feed not found: {e}") browser.close() return [] # ── Scroll the FEED PANEL (not the whole page) ────────────────── # Google Maps renders results in a scrollable sidebar feed feed_selector = 'div[role="feed"]' stale_scrolls = 0 last_count = 0 print(f"[*] Scrolling feed to collect up to {max_results} results...") while stale_scrolls < 15: # Scroll the feed container itself try: page.evaluate( """() => { const feed = document.querySelector('div[role="feed"]'); if (feed) feed.scrollBy(0, 2500); }""" ) except Exception: page.mouse.wheel(0, 3000) # fallback time.sleep(2.5) # wait for lazy-loaded results items = page.locator('div[role="feed"] div[role="article"]').all() current_count = len(items) print(f" Items visible: {current_count}") if current_count >= max_results: print(f"[+] Reached target count ({max_results}). Stopping scroll.") break if current_count == last_count: stale_scrolls += 1 print(f" No new items (stale {stale_scrolls}/15)...") else: stale_scrolls = 0 last_count = current_count # Check for end-of-list indicator try: end_text = page.locator("text=You've reached the end of the list").count() if end_text > 0: print("[+] End of list reached.") break except Exception: pass # ── Extraction ─────────────────────────────────────────────────── items = page.locator('div[role="feed"] div[role="article"]').all() final_items = items[:max_results] print(f"[*] Extracting data from {len(final_items)} items...") for i, item in enumerate(final_items): try: # Name from aria-label (most reliable) name = item.get_attribute("aria-label") or "" if not name: nl = item.locator('div.qBF1Pd') if nl.count() > 0: name = nl.first.inner_text() if not name: continue print(f" [{i+1}/{len(final_items)}] {name}") # Website website = "" for sel in ['a.lcr4fd', 'a[data-value="Website"]', 'a[href^="http"]:not([href*="google"])']: wl = item.locator(sel) if wl.count() > 0: website = wl.first.get_attribute("href") or "" if website: break # Phone phone = "" for sel in ['span.Us7fWe', 'span.UsdlK', 'button[data-item-id*="phone"] div.fontBodyMedium']: pl = item.locator(sel) if pl.count() > 0: phone = pl.first.inner_text().strip() if phone: break # Rating rating = "" rl = item.locator('span.MW4etd') if rl.count() > 0: rating = rl.first.inner_text().strip() else: rl2 = item.locator('span[role="img"][aria-label*="stars"]') if rl2.count() > 0: aria = rl2.first.get_attribute("aria-label") or "" rating = aria.split(" ")[0] lead = { "name": name, "website": website, "phone": phone, "rating": rating, "search_query": query, } if lead not in leads: leads.append(lead) except Exception as e: print(f"[!] Error on item {i}: {e}") browser.close() print(f"[+] Done. Collected {len(leads)} unique leads.") return leads if __name__ == "__main__": parser = argparse.ArgumentParser(description='Google Maps Scraper') parser.add_argument('--niche', required=True) parser.add_argument('--location', required=True) parser.add_argument('--limit', type=int, default=10) args = parser.parse_args() full_query = f"{args.niche} in {args.location}" results = scrape_google_maps(full_query, args.limit) import os os.makedirs(".tmp", exist_ok=True) with open(".tmp/raw_leads.json", "w") as f: json.dump(results, f, indent=4) print(f"[+] Saved {len(results)} leads to .tmp/raw_leads.json")