Spaces:
Sleeping
Sleeping
| """ | |
| Google Maps scraper using Selenium. | |
| Extracts business name, address, phone, and website from search results. | |
| """ | |
| import time | |
| import random | |
| import logging | |
| from typing import List, Dict | |
| from selenium import webdriver | |
| from selenium.webdriver.chrome.service import Service | |
| from selenium.webdriver.chrome.options import Options | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.common.keys import Keys | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| from selenium.webdriver.support import expected_conditions as EC | |
| from webdriver_manager.chrome import ChromeDriverManager | |
| logger = logging.getLogger(__name__) | |
| def _random_delay(min_sec: float = 1.5, max_sec: float = 3.5): | |
| """Add a random delay to mimic human behavior.""" | |
| time.sleep(random.uniform(min_sec, max_sec)) | |
| def create_driver() -> webdriver.Chrome: | |
| """Create a headless Chrome driver.""" | |
| options = Options() | |
| options.add_argument("--headless=new") | |
| options.add_argument("--no-sandbox") | |
| options.add_argument("--disable-dev-shm-usage") | |
| options.add_argument("--disable-gpu") | |
| options.add_argument("--window-size=1920,1080") | |
| options.add_argument( | |
| "--user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) " | |
| "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" | |
| ) | |
| # Suppress logging noise | |
| options.add_argument("--log-level=3") | |
| options.add_experimental_option("excludeSwitches", ["enable-logging"]) | |
| # Use system chromium if available (Docker/HF Spaces), else use webdriver-manager | |
| import os | |
| chrome_bin = os.environ.get("CHROME_BIN") | |
| chromedriver_path = os.environ.get("CHROMEDRIVER_PATH") | |
| if chrome_bin and os.path.exists(chrome_bin): | |
| options.binary_location = chrome_bin | |
| service = Service(chromedriver_path or "/usr/bin/chromedriver") | |
| else: | |
| service = Service(ChromeDriverManager().install()) | |
| driver = webdriver.Chrome(service=service, options=options) | |
| return driver | |
| def scrape_google_maps(query: str, limit: int = 50, lat: float = None, lng: float = None, zoom: int = 14, progress_callback=None) -> List[Dict]: | |
| """ | |
| Scrape Google Maps for business leads. | |
| Args: | |
| query: Search query (e.g., "gym in Mumbai") | |
| limit: Maximum number of results to extract | |
| lat: Latitude for radius search | |
| lng: Longitude for radius search | |
| zoom: Zoom level (radius proxy) | |
| progress_callback: Optional callback(found_count, message) for progress updates | |
| Returns: | |
| List of dicts with keys: name, phone, address, website | |
| """ | |
| driver = None | |
| results = [] | |
| try: | |
| logger.info(f"Starting scrape for: '{query}' (limit: {limit})") | |
| if progress_callback: | |
| progress_callback(0, "Launching browser...") | |
| driver = create_driver() | |
| # Navigate to Google Maps | |
| if lat is not None and lng is not None: | |
| url = f"https://www.google.com/maps/search/{query.replace(' ', '+')}/@{lat},{lng},{zoom}z" | |
| else: | |
| url = f"https://www.google.com/maps/search/{query.replace(' ', '+')}" | |
| driver.get(url) | |
| _random_delay(3, 5) | |
| if progress_callback: | |
| progress_callback(0, "Loaded Google Maps, scrolling results...") | |
| # Wait for results panel to load | |
| try: | |
| WebDriverWait(driver, 15).until( | |
| EC.presence_of_element_located( | |
| (By.CSS_SELECTOR, 'div[role="feed"]') | |
| ) | |
| ) | |
| except Exception: | |
| logger.warning("Could not find results feed, trying alternate selectors...") | |
| # Scroll the results panel to load more items | |
| scrollable = None | |
| try: | |
| scrollable = driver.find_element(By.CSS_SELECTOR, 'div[role="feed"]') | |
| except Exception: | |
| # Try alternate container | |
| try: | |
| scrollable = driver.find_element(By.CSS_SELECTOR, 'div[role="main"]') | |
| except Exception: | |
| logger.error("Could not find scrollable results container") | |
| return results | |
| # Scroll to load results | |
| last_count = 0 | |
| scroll_attempts = 0 | |
| max_scroll_attempts = 30 | |
| while scroll_attempts < max_scroll_attempts: | |
| # Find all listing links | |
| listings = driver.find_elements(By.CSS_SELECTOR, 'a[href*="/maps/place/"]') | |
| current_count = len(listings) | |
| if current_count >= limit: | |
| break | |
| if current_count == last_count: | |
| scroll_attempts += 1 | |
| else: | |
| scroll_attempts = 0 | |
| last_count = current_count | |
| # Scroll down | |
| driver.execute_script( | |
| "arguments[0].scrollTop = arguments[0].scrollHeight", scrollable | |
| ) | |
| _random_delay(1.5, 2.5) | |
| if progress_callback: | |
| progress_callback(current_count, f"Found {current_count} listings, scrolling for more...") | |
| # Check for "end of list" indicator | |
| try: | |
| end_marker = driver.find_element( | |
| By.XPATH, '//*[contains(text(), "end of list")]' | |
| ) | |
| if end_marker: | |
| logger.info("Reached end of results list") | |
| break | |
| except Exception: | |
| pass | |
| # Get all listing links (up to limit) | |
| listings = driver.find_elements(By.CSS_SELECTOR, 'a[href*="/maps/place/"]') | |
| listings = listings[:limit] | |
| if progress_callback: | |
| progress_callback(len(listings), f"Extracting details from {len(listings)} listings...") | |
| logger.info(f"Found {len(listings)} listings, extracting details...") | |
| # Extract details from each listing | |
| for i, listing in enumerate(listings): | |
| try: | |
| # Click on the listing to open its detail panel | |
| driver.execute_script("arguments[0].click();", listing) | |
| _random_delay(2, 3.5) | |
| lead = _extract_business_details(driver) | |
| if lead and lead.get("name"): | |
| results.append(lead) | |
| logger.info(f"[{i+1}/{len(listings)}] Extracted: {lead['name']}") | |
| if progress_callback: | |
| progress_callback(len(results), f"Extracted: {lead['name']}") | |
| # Go back to results | |
| try: | |
| back_btn = driver.find_element( | |
| By.CSS_SELECTOR, 'button[aria-label="Back"]' | |
| ) | |
| back_btn.click() | |
| _random_delay(1.5, 2.5) | |
| except Exception: | |
| driver.back() | |
| _random_delay(2, 3) | |
| except Exception as e: | |
| logger.warning(f"Error extracting listing {i+1}: {e}") | |
| continue | |
| except Exception as e: | |
| logger.error(f"Scraping failed: {e}") | |
| if progress_callback: | |
| progress_callback(len(results), f"Error: {str(e)}") | |
| finally: | |
| if driver: | |
| driver.quit() | |
| logger.info(f"Scraping complete. Total leads extracted: {len(results)}") | |
| return results | |
| def _extract_business_details(driver) -> Dict: | |
| """Extract business details from the currently open listing panel.""" | |
| lead = { | |
| "name": None, | |
| "phone": None, | |
| "address": None, | |
| "website": None, | |
| } | |
| try: | |
| # Business name β look in the header | |
| try: | |
| WebDriverWait(driver, 5).until( | |
| EC.presence_of_element_located((By.CSS_SELECTOR, "h1")) | |
| ) | |
| h1_elements = driver.find_elements(By.CSS_SELECTOR, "h1") | |
| for h1 in reversed(h1_elements): | |
| text = h1.text.strip() | |
| if text and text.lower() not in ["results", "search results"]: | |
| lead["name"] = text | |
| break | |
| except Exception: | |
| pass | |
| # Address - look for the address button/link | |
| try: | |
| address_el = driver.find_element( | |
| By.CSS_SELECTOR, 'button[data-item-id="address"]' | |
| ) | |
| lead["address"] = address_el.text.strip() | |
| except Exception: | |
| try: | |
| address_el = driver.find_element( | |
| By.CSS_SELECTOR, '[data-item-id*="address"]' | |
| ) | |
| lead["address"] = address_el.text.strip() | |
| except Exception: | |
| pass | |
| # Phone number | |
| try: | |
| phone_el = driver.find_element( | |
| By.CSS_SELECTOR, 'button[data-item-id*="phone"]' | |
| ) | |
| text = phone_el.text.strip() | |
| # Clean phone number | |
| phone = "".join(c for c in text if c.isdigit() or c in "+-() ") | |
| if phone: | |
| lead["phone"] = phone.strip() | |
| except Exception: | |
| pass | |
| # Website | |
| try: | |
| website_el = driver.find_element( | |
| By.CSS_SELECTOR, 'a[data-item-id="authority"]' | |
| ) | |
| lead["website"] = website_el.get_attribute("href") | |
| except Exception: | |
| try: | |
| website_el = driver.find_element( | |
| By.CSS_SELECTOR, '[data-item-id*="authority"]' | |
| ) | |
| lead["website"] = website_el.text.strip() | |
| except Exception: | |
| pass | |
| except Exception as e: | |
| logger.warning(f"Error in detail extraction: {e}") | |
| return lead | |