File size: 6,718 Bytes
b165957
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import json
import time
import argparse
from playwright.sync_api import sync_playwright

def scrape_google_maps(query, max_results=50):
    """
    Scrapes Google Maps for a specific query and returns a list of businesses.
    Scrolls the sidebar feed panel to load results up to max_results.
    """
    leads = []

    with sync_playwright() as p:
        print(f"[*] Launching browser...")
        browser = p.chromium.launch(headless=True)
        context = browser.new_context(
            user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
            locale="en-US",
            viewport={"width": 1280, "height": 900},
        )
        page = context.new_page()

        # Apply stealth if available
        try:
            from playwright_stealth import stealth
            if hasattr(stealth, 'stealth'):
                stealth.stealth(page)
            else:
                stealth(page)
            print("[+] Stealth applied.")
        except Exception as e:
            print(f"[!] Stealth not applied: {e}")

        print(f"[*] Searching: '{query}'")
        search_url = f"https://www.google.com/maps/search/{query.replace(' ', '+')}"

        try:
            page.goto(search_url, timeout=60000)
        except Exception as e:
            print(f"[!] Page load failed: {e}")
            browser.close()
            return []

        # Wait for the results feed to appear
        try:
            print("[*] Waiting for results feed...")
            page.wait_for_selector('div[role="feed"]', timeout=25000)
            print("[+] Results feed found.")
        except Exception as e:
            print(f"[!] Results feed not found: {e}")
            browser.close()
            return []

        # ── Scroll the FEED PANEL (not the whole page) ──────────────────
        # Google Maps renders results in a scrollable sidebar feed
        feed_selector = 'div[role="feed"]'
        stale_scrolls = 0
        last_count = 0

        print(f"[*] Scrolling feed to collect up to {max_results} results...")
        while stale_scrolls < 15:
            # Scroll the feed container itself
            try:
                page.evaluate(
                    """() => {
                        const feed = document.querySelector('div[role="feed"]');
                        if (feed) feed.scrollBy(0, 2500);
                    }"""
                )
            except Exception:
                page.mouse.wheel(0, 3000)  # fallback

            time.sleep(2.5)  # wait for lazy-loaded results

            items = page.locator('div[role="feed"] div[role="article"]').all()
            current_count = len(items)
            print(f"    Items visible: {current_count}")

            if current_count >= max_results:
                print(f"[+] Reached target count ({max_results}). Stopping scroll.")
                break

            if current_count == last_count:
                stale_scrolls += 1
                print(f"    No new items (stale {stale_scrolls}/15)...")
            else:
                stale_scrolls = 0

            last_count = current_count

            # Check for end-of-list indicator
            try:
                end_text = page.locator("text=You've reached the end of the list").count()
                if end_text > 0:
                    print("[+] End of list reached.")
                    break
            except Exception:
                pass

        # ── Extraction ───────────────────────────────────────────────────
        items = page.locator('div[role="feed"] div[role="article"]').all()
        final_items = items[:max_results]
        print(f"[*] Extracting data from {len(final_items)} items...")

        for i, item in enumerate(final_items):
            try:
                # Name from aria-label (most reliable)
                name = item.get_attribute("aria-label") or ""
                if not name:
                    nl = item.locator('div.qBF1Pd')
                    if nl.count() > 0:
                        name = nl.first.inner_text()
                if not name:
                    continue

                print(f"  [{i+1}/{len(final_items)}] {name}")

                # Website
                website = ""
                for sel in ['a.lcr4fd', 'a[data-value="Website"]', 'a[href^="http"]:not([href*="google"])']:
                    wl = item.locator(sel)
                    if wl.count() > 0:
                        website = wl.first.get_attribute("href") or ""
                        if website:
                            break

                # Phone
                phone = ""
                for sel in ['span.Us7fWe', 'span.UsdlK', 'button[data-item-id*="phone"] div.fontBodyMedium']:
                    pl = item.locator(sel)
                    if pl.count() > 0:
                        phone = pl.first.inner_text().strip()
                        if phone:
                            break

                # Rating
                rating = ""
                rl = item.locator('span.MW4etd')
                if rl.count() > 0:
                    rating = rl.first.inner_text().strip()
                else:
                    rl2 = item.locator('span[role="img"][aria-label*="stars"]')
                    if rl2.count() > 0:
                        aria = rl2.first.get_attribute("aria-label") or ""
                        rating = aria.split(" ")[0]

                lead = {
                    "name": name,
                    "website": website,
                    "phone": phone,
                    "rating": rating,
                    "search_query": query,
                }

                if lead not in leads:
                    leads.append(lead)

            except Exception as e:
                print(f"[!] Error on item {i}: {e}")

        browser.close()
        print(f"[+] Done. Collected {len(leads)} unique leads.")

    return leads


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Google Maps Scraper')
    parser.add_argument('--niche', required=True)
    parser.add_argument('--location', required=True)
    parser.add_argument('--limit', type=int, default=10)
    args = parser.parse_args()

    full_query = f"{args.niche} in {args.location}"
    results = scrape_google_maps(full_query, args.limit)

    import os
    os.makedirs(".tmp", exist_ok=True)
    with open(".tmp/raw_leads.json", "w") as f:
        json.dump(results, f, indent=4)
    print(f"[+] Saved {len(results)} leads to .tmp/raw_leads.json")