""" Google Maps scraper using Selenium. Extracts business name, address, phone, and website from search results. """ import time import random import logging from typing import List, Dict from selenium import webdriver from selenium.webdriver.chrome.service import Service from selenium.webdriver.chrome.options import Options from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from webdriver_manager.chrome import ChromeDriverManager logger = logging.getLogger(__name__) def _random_delay(min_sec: float = 1.5, max_sec: float = 3.5): """Add a random delay to mimic human behavior.""" time.sleep(random.uniform(min_sec, max_sec)) def create_driver() -> webdriver.Chrome: """Create a headless Chrome driver.""" options = Options() options.add_argument("--headless=new") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") options.add_argument("--disable-gpu") options.add_argument("--window-size=1920,1080") options.add_argument( "--user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ) # Suppress logging noise options.add_argument("--log-level=3") options.add_experimental_option("excludeSwitches", ["enable-logging"]) # Use system chromium if available (Docker/HF Spaces), else use webdriver-manager import os chrome_bin = os.environ.get("CHROME_BIN") chromedriver_path = os.environ.get("CHROMEDRIVER_PATH") if chrome_bin and os.path.exists(chrome_bin): options.binary_location = chrome_bin service = Service(chromedriver_path or "/usr/bin/chromedriver") else: service = Service(ChromeDriverManager().install()) driver = webdriver.Chrome(service=service, options=options) return driver def scrape_google_maps(query: str, limit: int = 50, lat: float = None, lng: float = None, zoom: int = 14, progress_callback=None) -> List[Dict]: """ Scrape Google Maps for business leads. Args: query: Search query (e.g., "gym in Mumbai") limit: Maximum number of results to extract lat: Latitude for radius search lng: Longitude for radius search zoom: Zoom level (radius proxy) progress_callback: Optional callback(found_count, message) for progress updates Returns: List of dicts with keys: name, phone, address, website """ driver = None results = [] try: logger.info(f"Starting scrape for: '{query}' (limit: {limit})") if progress_callback: progress_callback(0, "Launching browser...") driver = create_driver() # Navigate to Google Maps if lat is not None and lng is not None: url = f"https://www.google.com/maps/search/{query.replace(' ', '+')}/@{lat},{lng},{zoom}z" else: url = f"https://www.google.com/maps/search/{query.replace(' ', '+')}" driver.get(url) _random_delay(3, 5) if progress_callback: progress_callback(0, "Loaded Google Maps, scrolling results...") # Wait for results panel to load try: WebDriverWait(driver, 15).until( EC.presence_of_element_located( (By.CSS_SELECTOR, 'div[role="feed"]') ) ) except Exception: logger.warning("Could not find results feed, trying alternate selectors...") # Scroll the results panel to load more items scrollable = None try: scrollable = driver.find_element(By.CSS_SELECTOR, 'div[role="feed"]') except Exception: # Try alternate container try: scrollable = driver.find_element(By.CSS_SELECTOR, 'div[role="main"]') except Exception: logger.error("Could not find scrollable results container") return results # Scroll to load results last_count = 0 scroll_attempts = 0 max_scroll_attempts = 30 while scroll_attempts < max_scroll_attempts: # Find all listing links listings = driver.find_elements(By.CSS_SELECTOR, 'a[href*="/maps/place/"]') current_count = len(listings) if current_count >= limit: break if current_count == last_count: scroll_attempts += 1 else: scroll_attempts = 0 last_count = current_count # Scroll down driver.execute_script( "arguments[0].scrollTop = arguments[0].scrollHeight", scrollable ) _random_delay(1.5, 2.5) if progress_callback: progress_callback(current_count, f"Found {current_count} listings, scrolling for more...") # Check for "end of list" indicator try: end_marker = driver.find_element( By.XPATH, '//*[contains(text(), "end of list")]' ) if end_marker: logger.info("Reached end of results list") break except Exception: pass # Get all listing links (up to limit) listings = driver.find_elements(By.CSS_SELECTOR, 'a[href*="/maps/place/"]') listings = listings[:limit] if progress_callback: progress_callback(len(listings), f"Extracting details from {len(listings)} listings...") logger.info(f"Found {len(listings)} listings, extracting details...") # Extract details from each listing for i, listing in enumerate(listings): try: # Click on the listing to open its detail panel driver.execute_script("arguments[0].click();", listing) _random_delay(2, 3.5) lead = _extract_business_details(driver) if lead and lead.get("name"): results.append(lead) logger.info(f"[{i+1}/{len(listings)}] Extracted: {lead['name']}") if progress_callback: progress_callback(len(results), f"Extracted: {lead['name']}") # Go back to results try: back_btn = driver.find_element( By.CSS_SELECTOR, 'button[aria-label="Back"]' ) back_btn.click() _random_delay(1.5, 2.5) except Exception: driver.back() _random_delay(2, 3) except Exception as e: logger.warning(f"Error extracting listing {i+1}: {e}") continue except Exception as e: logger.error(f"Scraping failed: {e}") if progress_callback: progress_callback(len(results), f"Error: {str(e)}") finally: if driver: driver.quit() logger.info(f"Scraping complete. Total leads extracted: {len(results)}") return results def _extract_business_details(driver) -> Dict: """Extract business details from the currently open listing panel.""" lead = { "name": None, "phone": None, "address": None, "website": None, } try: # Business name — look in the header try: WebDriverWait(driver, 5).until( EC.presence_of_element_located((By.CSS_SELECTOR, "h1")) ) h1_elements = driver.find_elements(By.CSS_SELECTOR, "h1") for h1 in reversed(h1_elements): text = h1.text.strip() if text and text.lower() not in ["results", "search results"]: lead["name"] = text break except Exception: pass # Address - look for the address button/link try: address_el = driver.find_element( By.CSS_SELECTOR, 'button[data-item-id="address"]' ) lead["address"] = address_el.text.strip() except Exception: try: address_el = driver.find_element( By.CSS_SELECTOR, '[data-item-id*="address"]' ) lead["address"] = address_el.text.strip() except Exception: pass # Phone number try: phone_el = driver.find_element( By.CSS_SELECTOR, 'button[data-item-id*="phone"]' ) text = phone_el.text.strip() # Clean phone number phone = "".join(c for c in text if c.isdigit() or c in "+-() ") if phone: lead["phone"] = phone.strip() except Exception: pass # Website try: website_el = driver.find_element( By.CSS_SELECTOR, 'a[data-item-id="authority"]' ) lead["website"] = website_el.get_attribute("href") except Exception: try: website_el = driver.find_element( By.CSS_SELECTOR, '[data-item-id*="authority"]' ) lead["website"] = website_el.text.strip() except Exception: pass except Exception as e: logger.warning(f"Error in detail extraction: {e}") return lead