# ============================================================ # CHANGELOG - google_maps_scraper.py # ============================================================ # Issue ID | Change Description | Lines Affected # ------------------------------------------------------------ # INIT-01 | ALREADY IMPLEMENTED - no changes needed | N/A # INIT-03 | Enhanced error messages for browser init | scrape_reviews() (lines ~310-330) # NAV-01 | Added _wait_for_page_load() method | New method (lines ~260-280) # NAV-01 | Replaced time.sleep(5) with WebDriverWait | scrape_reviews() (lines ~340-355) # ============================================================ # IMPORTANT: All other code is UNCHANGED from original working version # FMT-02 is already NESTED - NO CHANGE NEEDED # ============================================================ """ Google Maps Review Scraper - 2025 Production Version Updated with VERIFIED selectors from actual Google Maps DOM inspection. Key fixes based on selector documentation: 1. Reviews tab: button with aria-label="Reviews" or containing "Reviews" text 2. Scrollable container: div.m6QErb.DxyBCb OR div.XiKgde OR div[role='feed'] 3. Review cards: div.jftiEf.fontBodyMedium with data-review-id 4. Reviewer name: div.d4r55 (no trailing space) 5. Star rating: span.kvMYJc child span with aria-label 6. Date: span.rsqaWe 7. Review text: span.wiI7pd (truncated) or span[jsname='fbQN7e'] (full) 8. More button: button.w8nwRe or button.kyuUzc """ import time import re import os from typing import List, Dict, Any, Optional, Callable from selenium import webdriver from selenium.common.exceptions import ( NoSuchElementException, StaleElementReferenceException, TimeoutException, ElementClickInterceptedException, WebDriverException ) from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.common.keys import Keys import random class GoogleMapsScraper: """ Scrapes restaurant reviews from Google Maps. VERIFIED selectors (Nov 2025): - Review cards: div.jftiEf.fontBodyMedium with data-review-id - Reviewer name: div.d4r55 - Star rating: span.kvMYJc > span[aria-label*="star"] - Date: span.rsqaWe - Review text: span.wiI7pd (truncated) or span[jsname='fbQN7e'] (full) - More button: button.w8nwRe or button.kyuUzc - Scrollable container: div.m6QErb.DxyBCb or div.XiKgde """ # VERIFIED selectors from documentation SELECTORS = { # Reviews tab button - multiple fallbacks "reviews_tab": [ "//button[contains(@aria-label, 'Reviews')]", "//button[@role='tab'][contains(., 'Reviews')]", "//button[@role='tab'][contains(., 'reviews')]", "//div[@role='tablist']//button[contains(., 'Review')]", "//button[@data-tab-index='1']", "//button[contains(@class, 'hh2c6')]", ], # Scrollable reviews container - VERIFIED classes "scrollable_div": [ "//div[contains(@class, 'm6QErb') and contains(@class, 'DxyBCb')]", "//div[contains(@class, 'XiKgde')]", "//div[@role='feed']", "//div[contains(@class, 'm6QErb')][@tabindex='-1']", "//div[contains(@class, 'm6QErb')]", ], # Individual review cards - VERIFIED: div.jftiEf with data-review-id "review_cards": [ "//div[@data-review-id]", "//div[contains(@class, 'jftiEf') and contains(@class, 'fontBodyMedium')]", "//div[contains(@class, 'jftiEf')]", ], # Reviewer name - VERIFIED: div.d4r55 (no trailing space!) "reviewer_name": [ ".//div[contains(@class, 'd4r55')]", ".//button[contains(@class, 'WEBjve')]//div", ".//a[contains(@class, 'WNBkOb')]//div[1]", ], # Star rating - VERIFIED: span.kvMYJc child with aria-label "rating": [ ".//span[contains(@class, 'kvMYJc')]//span[@aria-label]", ".//span[@aria-label][contains(@aria-label, 'star')]", ".//div[@role='img'][@aria-label]", ], # Review date - VERIFIED: span.rsqaWe "date": [ ".//span[contains(@class, 'rsqaWe')]", ".//span[contains(text(), 'ago')]", ".//span[contains(text(), 'week')]", ".//span[contains(text(), 'month')]", ".//span[contains(text(), 'day')]", ".//span[contains(text(), 'year')]", ], # Review text - VERIFIED: span.wiI7pd and jsname variants "review_text": [ ".//span[contains(@class, 'wiI7pd')]", ".//span[@jsname='fbQN7e']", # Full expanded text ".//span[@jsname='bN97Pc']", # Truncated text ".//div[contains(@class, 'MyEned')]//span", ], # "More" button - VERIFIED: button.w8nwRe or button.kyuUzc "more_button": [ ".//button[contains(@class, 'w8nwRe')]", ".//button[contains(@class, 'kyuUzc')]", ".//button[@aria-expanded='false']", ".//button[contains(@aria-label, 'More')]", ".//button[contains(@aria-label, 'more')]", ".//span[text()='More']/parent::button", ".//button[.//span[text()='More']]", ], # [NAV-01] Added selectors for page load verification "page_loaded": [ "//div[contains(@class, 'fontHeadlineSmall')]", # Restaurant name "//button[contains(@aria-label, 'Reviews')]", # Reviews tab "//div[@role='main']", # Main content "//h1", # Page title ], } def __init__(self, headless: bool = True, chromedriver_path: Optional[str] = None): """Initialize the scraper.""" self.headless = headless self.driver = None self.wait = None self.chromedriver_path = chromedriver_path or self._find_chromedriver() def _find_chromedriver(self) -> str: """Find chromedriver in common locations.""" common_paths = [ '/usr/local/bin/chromedriver', '/usr/bin/chromedriver', '/opt/chromedriver', 'chromedriver', ] for path in common_paths: if os.path.exists(path): return path try: from webdriver_manager.chrome import ChromeDriverManager return ChromeDriverManager().install() except ImportError: pass except Exception: pass return '/usr/local/bin/chromedriver' def _init_driver(self): """Initialize Chrome WebDriver with anti-detection settings.""" chrome_options = Options() if self.headless: chrome_options.add_argument('--headless=new') chrome_options.add_argument('--no-sandbox') chrome_options.add_argument('--disable-dev-shm-usage') chrome_options.add_argument('--disable-gpu') chrome_options.add_argument('--window-size=1920,1080') chrome_options.add_argument('--lang=en-US') # Realistic user agent chrome_options.add_argument( '--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' 'AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36' ) # Anti-detection chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) chrome_options.add_experimental_option('useAutomationExtension', False) chrome_options.add_argument('--disable-blink-features=AutomationControlled') try: service = Service(self.chromedriver_path) self.driver = webdriver.Chrome(service=service, options=chrome_options) except Exception: self.driver = webdriver.Chrome(options=chrome_options) self.driver.set_page_load_timeout(60) self.wait = WebDriverWait(self.driver, 20) # Anti-detection CDP self.driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', { 'source': ''' Object.defineProperty(navigator, 'webdriver', {get: () => undefined}); Object.defineProperty(navigator, 'plugins', {get: () => [1, 2, 3, 4, 5]}); ''' }) def _cleanup(self): """Close browser.""" if self.driver: try: self.driver.quit() except: pass self.driver = None def _random_delay(self, min_sec: float = 0.5, max_sec: float = 1.5): """Add random delay.""" time.sleep(random.uniform(min_sec, max_sec)) def _find_elements_with_fallback(self, selectors: List[str]) -> List: """Try multiple selectors until one returns elements.""" for selector in selectors: try: elements = self.driver.find_elements(By.XPATH, selector) if elements: return elements except: continue return [] def _extract_rating(self, review_element) -> float: """Extract star rating using aria-label.""" for selector in self.SELECTORS["rating"]: try: elem = review_element.find_element(By.XPATH, selector) aria_label = elem.get_attribute('aria-label') if aria_label: match = re.search(r'(\d+)\s*star', aria_label.lower()) if match: return float(match.group(1)) except (NoSuchElementException, StaleElementReferenceException): continue return 0.0 def _extract_text(self, parent, selectors: List[str]) -> str: """Extract text using fallback selectors.""" for selector in selectors: try: element = parent.find_element(By.XPATH, selector) text = element.text.strip() if text: return text except (NoSuchElementException, StaleElementReferenceException): continue return "" def _expand_review_text(self, review_element): """Click 'More' button to expand truncated review.""" for selector in self.SELECTORS["more_button"]: try: more_btn = review_element.find_element(By.XPATH, selector) if more_btn and more_btn.is_displayed(): try: more_btn.click() except ElementClickInterceptedException: self.driver.execute_script("arguments[0].click();", more_btn) self._random_delay(0.3, 0.6) return True except (NoSuchElementException, StaleElementReferenceException): continue return False def _get_scrollable_element(self, progress_callback=None): """Find the scrollable reviews container.""" for selector in self.SELECTORS["scrollable_div"]: try: element = self.driver.find_element(By.XPATH, selector) if element: self._log_progress(f"✅ Found scrollable container with: {selector[:50]}...", progress_callback) return element except NoSuchElementException: continue return None def _scroll_reviews(self, scrollable_element, scroll_pause: float = 1.5): """Scroll the reviews panel to load more.""" if not scrollable_element: return False try: last_height = self.driver.execute_script( "return arguments[0].scrollHeight", scrollable_element ) self.driver.execute_script( "arguments[0].scrollTop = arguments[0].scrollHeight", scrollable_element ) time.sleep(scroll_pause + random.uniform(0, 0.5)) new_height = self.driver.execute_script( "return arguments[0].scrollHeight", scrollable_element ) return new_height > last_height except Exception as e: print(f"Scroll error: {e}") return False def _click_reviews_tab(self, progress_callback=None) -> bool: """Click on the Reviews tab.""" # First, wait for page to fully load time.sleep(3) # Try each selector for selector in self.SELECTORS["reviews_tab"]: try: self._log_progress(f"🔍 Trying selector: {selector[:60]}...", progress_callback) tab = WebDriverWait(self.driver, 5).until( EC.element_to_be_clickable((By.XPATH, selector)) ) tab.click() self._log_progress("✅ Clicked Reviews tab", progress_callback) time.sleep(3) return True except (TimeoutException, NoSuchElementException, ElementClickInterceptedException) as e: continue # Fallback: Try finding any button with "Review" text try: buttons = self.driver.find_elements(By.TAG_NAME, "button") for btn in buttons: try: btn_text = btn.text.lower() btn_aria = (btn.get_attribute('aria-label') or '').lower() if 'review' in btn_text or 'review' in btn_aria: self._log_progress(f"🔍 Found button with text: {btn.text[:30]}", progress_callback) btn.click() time.sleep(3) return True except: continue except: pass # Last resort: Try clicking on the reviews count text try: review_count_elem = self.driver.find_element(By.XPATH, "//button[contains(., 'review')]") review_count_elem.click() time.sleep(3) return True except: pass return False def _handle_consent_dialog(self, progress_callback=None): """Handle Google consent/cookie dialog if it appears.""" try: consent_selectors = [ "//button[contains(., 'Accept all')]", "//button[contains(., 'Accept')]", "//button[contains(., 'Reject all')]", "//button[contains(., 'I agree')]", "//form//button", ] for selector in consent_selectors: try: btn = WebDriverWait(self.driver, 3).until( EC.element_to_be_clickable((By.XPATH, selector)) ) if btn.is_displayed(): self._log_progress("🍪 Handling consent dialog...", progress_callback) btn.click() time.sleep(2) return True except: continue except: pass return False def _debug_page_state(self, progress_callback=None): """Debug: Log page state to help diagnose issues.""" try: # Get page title title = self.driver.title self._log_progress(f"📄 Page title: {title}", progress_callback) # Check if we're on the right page url = self.driver.current_url self._log_progress(f"📄 Current URL: {url[:80]}...", progress_callback) # Count some elements all_buttons = self.driver.find_elements(By.TAG_NAME, "button") all_divs = self.driver.find_elements(By.TAG_NAME, "div") self._log_progress(f"📄 Page has {len(all_buttons)} buttons, {len(all_divs)} divs", progress_callback) # Look for any tab-like elements tabs = self.driver.find_elements(By.XPATH, "//button[@role='tab']") self._log_progress(f"📄 Found {len(tabs)} tab buttons", progress_callback) for tab in tabs[:5]: try: tab_text = tab.text[:30] if tab.text else "(no text)" tab_aria = tab.get_attribute('aria-label') or "(no aria)" self._log_progress(f" Tab: {tab_text} | aria: {tab_aria[:30]}", progress_callback) except: pass # Look for review-related elements review_elements = self.driver.find_elements(By.XPATH, "//*[contains(@class, 'jftiEf')]") self._log_progress(f"📄 Found {len(review_elements)} elements with 'jftiEf' class", progress_callback) except Exception as e: self._log_progress(f"⚠️ Debug error: {e}", progress_callback) def _extract_review_data(self, review_element, idx: int) -> Optional[Dict]: """Extract all data from a single review card.""" try: # Try to expand truncated text self._expand_review_text(review_element) # Extract reviewer name name = self._extract_text(review_element, self.SELECTORS["reviewer_name"]) # Extract date date = self._extract_text(review_element, self.SELECTORS["date"]) # Extract star rating rating = self._extract_rating(review_element) # Extract review text (try expanded first, then truncated) text = "" for selector in [".//span[@jsname='fbQN7e']", ".//span[contains(@class, 'wiI7pd')]"]: try: elem = review_element.find_element(By.XPATH, selector) t = elem.text.strip() if t and len(t) > len(text): text = t except: continue if not text: text = self._extract_text(review_element, self.SELECTORS["review_text"]) # Validate if not text or len(text) < 10: return None return { 'name': name, 'date': date.strip() if date else "", 'rating': rating, 'text': text } except StaleElementReferenceException: return None except Exception as e: print(f"[GMAPS] Error extracting review {idx}: {e}") return None # [NAV-01] NEW METHOD - Wait for page to load with specific element def _wait_for_page_load(self, timeout: int = 10) -> bool: """ Wait for Google Maps page to load by checking for key elements. Returns True if page loaded, False if timeout. """ for selector in self.SELECTORS["page_loaded"]: try: WebDriverWait(self.driver, timeout).until( EC.presence_of_element_located((By.XPATH, selector)) ) return True except TimeoutException: continue return False def scrape_reviews( self, url: str, max_reviews: Optional[int] = None, progress_callback: Optional[Callable[[str], None]] = None ) -> Dict[str, Any]: """Scrape reviews from Google Maps restaurant page.""" if not self._validate_url(url): return { 'success': False, 'error': 'Invalid Google Maps URL. Use google.com/maps or goo.gl/maps', 'reviews': {} } # [INIT-03] Enhanced error handling with user-friendly messages try: self._init_driver() except FileNotFoundError as e: return { 'success': False, 'error': f'Chromedriver not found at {self.chromedriver_path}. Please install Chrome/Chromedriver or set the correct path.', 'reviews': {} } except WebDriverException as e: error_msg = str(e).lower() if 'chromedriver' in error_msg or 'chrome' in error_msg or 'session not created' in error_msg: return { 'success': False, 'error': f'Browser initialization failed. Please ensure Chrome and Chromedriver are installed and compatible. Details: {str(e)[:200]}', 'reviews': {} } return { 'success': False, 'error': f'Browser initialization failed: {str(e)[:200]}', 'reviews': {} } except Exception as e: return { 'success': False, 'error': f'Browser initialization failed: {str(e)}', 'reviews': {} } try: self._log_progress("🚀 Starting Google Maps scraper...", progress_callback) # Load the page self.driver.get(url) # [NAV-01] Use WebDriverWait instead of fixed 5s sleep self._log_progress("⏳ Waiting for page to load...", progress_callback) if not self._wait_for_page_load(timeout=10): # Fallback to short sleep if element not found (page might still work) self._log_progress("⚠️ Page load check timed out, continuing with fallback...", progress_callback) time.sleep(3) else: self._log_progress("✅ Page loaded successfully", progress_callback) # Handle consent dialog if present self._handle_consent_dialog(progress_callback) # Debug: check page state self._debug_page_state(progress_callback) # Click Reviews tab self._log_progress("📋 Looking for Reviews tab...", progress_callback) if not self._click_reviews_tab(progress_callback): self._log_progress("⚠️ Could not find Reviews tab, trying to scroll anyway...", progress_callback) # Try scrolling down to trigger lazy loading self.driver.execute_script("window.scrollBy(0, 500);") time.sleep(2) time.sleep(3) # Debug again after clicking tab self._debug_page_state(progress_callback) # Find scrollable container scrollable = self._get_scrollable_element(progress_callback) if not scrollable: self._log_progress("⚠️ Could not find scrollable reviews container", progress_callback) # Initialize data containers names = [] dates = [] ratings = [] review_texts = [] processed_ids = set() scroll_count = 0 no_new_reviews_count = 0 max_no_new = 5 max_scrolls = (max_reviews // 3) + 20 if max_reviews else 100 while scroll_count < max_scrolls and no_new_reviews_count < max_no_new: scroll_count += 1 # Find all review cards review_elements = self._find_elements_with_fallback(self.SELECTORS["review_cards"]) self._log_progress( f"📄 Scroll {scroll_count}: Found {len(review_elements)} review cards, " f"collected {len(review_texts)} unique reviews", progress_callback ) new_reviews_this_scroll = 0 for idx, review_elem in enumerate(review_elements): if max_reviews and len(review_texts) >= max_reviews: break try: review_id = review_elem.get_attribute('data-review-id') if not review_id: review_id = f"pos_{idx}_{review_elem.location['y']}" except: review_id = f"idx_{idx}_{scroll_count}" if review_id in processed_ids: continue review_data = self._extract_review_data(review_elem, idx) if review_data: if review_data['text'] not in review_texts: names.append(review_data['name']) dates.append(review_data['date']) ratings.append(review_data['rating']) review_texts.append(review_data['text']) new_reviews_this_scroll += 1 processed_ids.add(review_id) if idx % 5 == 0: self._random_delay(0.1, 0.3) if new_reviews_this_scroll == 0: no_new_reviews_count += 1 else: no_new_reviews_count = 0 if max_reviews and len(review_texts) >= max_reviews: self._log_progress(f"🎯 Reached target: {max_reviews} reviews", progress_callback) break # Scroll if scrollable: self._scroll_reviews(scrollable) else: self.driver.execute_script("window.scrollBy(0, 500);") time.sleep(1.5) self._cleanup() # Trim to max_reviews if max_reviews: names = names[:max_reviews] dates = dates[:max_reviews] ratings = ratings[:max_reviews] review_texts = review_texts[:max_reviews] self._log_progress( f"✅ Scraped {len(review_texts)} reviews from Google Maps", progress_callback ) # Return NESTED format (already correct - FMT-02) return { 'success': True, 'total_reviews': len(review_texts), 'total_pages': scroll_count, 'reviews': { 'names': names, 'dates': dates, 'overall_ratings': ratings, 'food_ratings': [0.0] * len(ratings), 'service_ratings': [0.0] * len(ratings), 'ambience_ratings': [0.0] * len(ratings), 'review_texts': review_texts }, 'metadata': { 'source': 'google_maps', 'url': url, 'scroll_count': scroll_count } } except Exception as e: self._cleanup() import traceback traceback.print_exc() return { 'success': False, 'error': str(e), 'reviews': {} } def _validate_url(self, url: str) -> bool: """Validate Google Maps URL.""" if not url: return False url_lower = url.lower() return any(x in url_lower for x in [ 'google.com/maps', 'goo.gl/maps', 'maps.google', 'maps.app.goo.gl' ]) def _log_progress(self, message: str, callback: Optional[Callable]): """Log progress.""" print(message) if callback: callback(message) def __del__(self): self._cleanup() def scrape_google_maps( url: str, max_reviews: Optional[int] = None, headless: bool = True, chromedriver_path: Optional[str] = None ) -> Dict[str, Any]: """ Scrape reviews from Google Maps. Args: url: Google Maps restaurant URL max_reviews: Maximum number of reviews to scrape (None = all available) headless: Run browser in headless mode chromedriver_path: Optional path to chromedriver Returns: Dict with 'success', 'total_reviews', and 'reviews' data in NESTED format """ scraper = GoogleMapsScraper(headless=headless, chromedriver_path=chromedriver_path) return scraper.scrape_reviews(url, max_reviews=max_reviews) if __name__ == "__main__": print("=" * 80) print("Google Maps Review Scraper - Production Test (Nov 2025)") print("=" * 80 + "\n") test_url = "https://www.google.com/maps/place/Tutto+Italian+Restaurant+%26+Bar" print(f"Target: {test_url}") print("Limit: 20 reviews (test mode)") print("Mode: HEADLESS\n") result = scrape_google_maps(test_url, max_reviews=20, headless=True) print("\n" + "=" * 80) if result['success']: print("SUCCESS!") print(f" Total reviews scraped: {result['total_reviews']}") print(f" Scroll iterations: {result.get('total_pages', 'N/A')}") if result['total_reviews'] > 0: print(f"\n Sample (first review):") print(f" Name: {result['reviews']['names'][0]}") print(f" Date: {result['reviews']['dates'][0]}") print(f" Rating: {result['reviews']['overall_ratings'][0]}") text = result['reviews']['review_texts'][0] print(f" Review: {text[:150]}{'...' if len(text) > 150 else ''}") else: print("FAILED") print(f" Error: {result.get('error', 'Unknown error')}") print("=" * 80)