lead-hunter-ai / tools /google_maps_scraper.py
agenticworkflowsspace's picture
Upload tools/google_maps_scraper.py with huggingface_hub
b165957 verified
import json
import time
import argparse
from playwright.sync_api import sync_playwright
def scrape_google_maps(query, max_results=50):
"""
Scrapes Google Maps for a specific query and returns a list of businesses.
Scrolls the sidebar feed panel to load results up to max_results.
"""
leads = []
with sync_playwright() as p:
print(f"[*] Launching browser...")
browser = p.chromium.launch(headless=True)
context = browser.new_context(
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
locale="en-US",
viewport={"width": 1280, "height": 900},
)
page = context.new_page()
# Apply stealth if available
try:
from playwright_stealth import stealth
if hasattr(stealth, 'stealth'):
stealth.stealth(page)
else:
stealth(page)
print("[+] Stealth applied.")
except Exception as e:
print(f"[!] Stealth not applied: {e}")
print(f"[*] Searching: '{query}'")
search_url = f"https://www.google.com/maps/search/{query.replace(' ', '+')}"
try:
page.goto(search_url, timeout=60000)
except Exception as e:
print(f"[!] Page load failed: {e}")
browser.close()
return []
# Wait for the results feed to appear
try:
print("[*] Waiting for results feed...")
page.wait_for_selector('div[role="feed"]', timeout=25000)
print("[+] Results feed found.")
except Exception as e:
print(f"[!] Results feed not found: {e}")
browser.close()
return []
# ── Scroll the FEED PANEL (not the whole page) ──────────────────
# Google Maps renders results in a scrollable sidebar feed
feed_selector = 'div[role="feed"]'
stale_scrolls = 0
last_count = 0
print(f"[*] Scrolling feed to collect up to {max_results} results...")
while stale_scrolls < 15:
# Scroll the feed container itself
try:
page.evaluate(
"""() => {
const feed = document.querySelector('div[role="feed"]');
if (feed) feed.scrollBy(0, 2500);
}"""
)
except Exception:
page.mouse.wheel(0, 3000) # fallback
time.sleep(2.5) # wait for lazy-loaded results
items = page.locator('div[role="feed"] div[role="article"]').all()
current_count = len(items)
print(f" Items visible: {current_count}")
if current_count >= max_results:
print(f"[+] Reached target count ({max_results}). Stopping scroll.")
break
if current_count == last_count:
stale_scrolls += 1
print(f" No new items (stale {stale_scrolls}/15)...")
else:
stale_scrolls = 0
last_count = current_count
# Check for end-of-list indicator
try:
end_text = page.locator("text=You've reached the end of the list").count()
if end_text > 0:
print("[+] End of list reached.")
break
except Exception:
pass
# ── Extraction ───────────────────────────────────────────────────
items = page.locator('div[role="feed"] div[role="article"]').all()
final_items = items[:max_results]
print(f"[*] Extracting data from {len(final_items)} items...")
for i, item in enumerate(final_items):
try:
# Name from aria-label (most reliable)
name = item.get_attribute("aria-label") or ""
if not name:
nl = item.locator('div.qBF1Pd')
if nl.count() > 0:
name = nl.first.inner_text()
if not name:
continue
print(f" [{i+1}/{len(final_items)}] {name}")
# Website
website = ""
for sel in ['a.lcr4fd', 'a[data-value="Website"]', 'a[href^="http"]:not([href*="google"])']:
wl = item.locator(sel)
if wl.count() > 0:
website = wl.first.get_attribute("href") or ""
if website:
break
# Phone
phone = ""
for sel in ['span.Us7fWe', 'span.UsdlK', 'button[data-item-id*="phone"] div.fontBodyMedium']:
pl = item.locator(sel)
if pl.count() > 0:
phone = pl.first.inner_text().strip()
if phone:
break
# Rating
rating = ""
rl = item.locator('span.MW4etd')
if rl.count() > 0:
rating = rl.first.inner_text().strip()
else:
rl2 = item.locator('span[role="img"][aria-label*="stars"]')
if rl2.count() > 0:
aria = rl2.first.get_attribute("aria-label") or ""
rating = aria.split(" ")[0]
lead = {
"name": name,
"website": website,
"phone": phone,
"rating": rating,
"search_query": query,
}
if lead not in leads:
leads.append(lead)
except Exception as e:
print(f"[!] Error on item {i}: {e}")
browser.close()
print(f"[+] Done. Collected {len(leads)} unique leads.")
return leads
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Google Maps Scraper')
parser.add_argument('--niche', required=True)
parser.add_argument('--location', required=True)
parser.add_argument('--limit', type=int, default=10)
args = parser.parse_args()
full_query = f"{args.niche} in {args.location}"
results = scrape_google_maps(full_query, args.limit)
import os
os.makedirs(".tmp", exist_ok=True)
with open(".tmp/raw_leads.json", "w") as f:
json.dump(results, f, indent=4)
print(f"[+] Saved {len(results)} leads to .tmp/raw_leads.json")