import gradio as gr import pycountry import pandas as pd import folium from folium.plugins import MarkerCluster import base64 import time import overpy from geopy.geocoders import Nominatim import requests from bs4 import BeautifulSoup # ------------------------------------------------------------------- # Setup # ------------------------------------------------------------------- geolocator = Nominatim(user_agent="hf-saas-dashboard") api = overpy.Overpass() GOOGLE_API_KEY = "AIzaSyD9W7W7nYKRYbtwPm20uVyVr_aW18Y4uiE" # <-- replace with your Google API key # ------------------------------------------------------------------- # Helpers # ------------------------------------------------------------------- def list_countries(): items = sorted([(c.name, c.alpha_2) for c in pycountry.countries], key=lambda x: x[0]) names = [name for name, _ in items] codes = [code for _, code in items] return names, codes def list_subdivisions(country_code): subs = list(pycountry.subdivisions.get(country_code=country_code)) if not subs: return [], [] items = sorted([(s.name, s.code) for s in subs], key=lambda x: x[0]) names = [n for n, _ in items] codes = [c for _, c in items] return names, codes def geocode_region(name, country_code): """Use geopy + Nominatim to geocode a region and return its bounding box.""" query = f"{name}, {country_code}" location = geolocator.geocode(query, exactly_one=True, addressdetails=False) if not location: return None if not location.raw.get("boundingbox"): return None bbox = [float(x) for x in location.raw["boundingbox"]] # geopy returns boundingbox as [south, north, west, east] return (bbox[0], bbox[2], bbox[1], bbox[3]) def fetch_places(amenities, bbox): """Fetch places with given amenities inside a bounding box using overpy.""" south, west, north, east = bbox amen_regex = "|".join(amenities) query = f""" ( node["amenity"~"^{amen_regex}$"]({south},{west},{north},{east}); way["amenity"~"^{amen_regex}$"]({south},{west},{north},{east}); relation["amenity"~"^{amen_regex}$"]({south},{west},{north},{east}); ); out center tags; """ result = api.query(query) rows = [] def parse_tags(tags, lat, lon, osm_id): return { "name": tags.get("name", ""), "amenity": tags.get("amenity", ""), "lat": lat, "lon": lon, "phone": tags.get("phone", ""), "website": tags.get("website", ""), "osm_id": osm_id, } for node in result.nodes: rows.append(parse_tags(node.tags, node.lat, node.lon, f"node/{node.id}")) for way in result.ways: rows.append(parse_tags(way.tags, getattr(way, "center_lat", None), getattr(way, "center_lon", None), f"way/{way.id}")) for rel in result.relations: rows.append(parse_tags(rel.tags, getattr(rel, "center_lat", None), getattr(rel, "center_lon", None), f"relation/{rel.id}")) return pd.DataFrame(rows).dropna(subset=["lat", "lon"]) # ------------------------------- # Google API Enrichment # ------------------------------- def enrich_with_google_api(df, log_msgs): """Enrich OSM results with Google Places API (phone + website).""" used = 0 for i, row in df.iterrows(): if row["phone"] and row["website"]: continue # already have data query = f"{row['name']} near {row['lat']},{row['lon']}" url = "https://maps.googleapis.com/maps/api/place/textsearch/json" params = {"query": query, "key": GOOGLE_API_KEY} r = requests.get(url, params=params) data = r.json() if data.get("results"): place_id = data["results"][0]["place_id"] # Fetch details details_url = "https://maps.googleapis.com/maps/api/place/details/json" d_params = {"place_id": place_id, "fields": "formatted_phone_number,website", "key": GOOGLE_API_KEY} d = requests.get(details_url, params=d_params).json() details = d.get("result", {}) df.at[i, "phone"] = details.get("formatted_phone_number", row["phone"]) df.at[i, "website"] = details.get("website", row["website"]) used += 1 if used > 0: log_msgs.append(f"✅ Google API enriched {used} rows.") else: log_msgs.append("⚠️ Google API did not enrich any rows.") return df # ------------------------------- # Google Scraper Fallback # ------------------------------- def scrape_google_maps(name, lat, lon): """Scrape Google Maps web UI for phone number (last resort, experimental).""" search_url = f"https://www.google.com/maps/search/{name}/@{lat},{lon},15z" headers = {"User-Agent": "Mozilla/5.0"} r = requests.get(search_url, headers=headers) if r.status_code != 200: return None soup = BeautifulSoup(r.text, "html.parser") phone = None for span in soup.find_all("span"): if "+" in span.text and any(c.isdigit() for c in span.text): phone = span.text.strip() break return phone def enrich_with_scraper(df, log_msgs): used = 0 for i, row in df.iterrows(): if row["phone"]: continue phone = scrape_google_maps(row["name"], row["lat"], row["lon"]) if phone: df.at[i, "phone"] = phone used += 1 if used > 0: log_msgs.append(f"✅ Scraper fallback enriched {used} rows.") else: log_msgs.append("⚠️ Scraper fallback did not find more phones.") return df # ------------------------------- # Utilities # ------------------------------- def clean_phone_for_whatsapp(phone): """Convert phone number into WhatsApp-friendly format (digits only, keep +).""" if not phone: return None cleaned = "".join(c for c in phone if c.isdigit() or c == "+") return cleaned if cleaned else None def df_to_csv_bytes(df): return df.to_csv(index=False).encode("utf-8") def make_map(df, center_bbox=None): if df.empty: return folium.Map(location=[20, 0], zoom_start=2)._repr_html_() if center_bbox: south, west, north, east = center_bbox center_lat = (south + north) / 2 center_lon = (west + east) / 2 else: center_lat = df["lat"].mean() center_lon = df["lon"].mean() m = folium.Map(location=[center_lat, center_lon], zoom_start=8) cluster = MarkerCluster().add_to(m) for _, row in df.iterrows(): wa_link = f"https://wa.me/{clean_phone_for_whatsapp(row['phone'])}" if row['phone'] else None popup_html = f""" {row['name'] or 'Unnamed Place'}
📞 {row['phone'] if row['phone'] else 'N/A'}
🌐 {row['website'] or 'N/A'}
🍴 {row['amenity']}
{f'💬 WhatsApp' if wa_link else ''} """ folium.Marker( [row["lat"], row["lon"]], popup=folium.Popup(popup_html, max_width=300), tooltip=row["name"] if row["name"] else row["amenity"] ).add_to(cluster) return m._repr_html_() # ------------------------------------------------------------------- # Gradio Callbacks # ------------------------------------------------------------------- COUNTRY_NAMES, COUNTRY_CODES = list_countries() def update_states(selected_country_name): try: idx = COUNTRY_NAMES.index(selected_country_name) code = COUNTRY_CODES[idx] except ValueError: return gr.update(choices=[]) names, _ = list_subdivisions(code) return gr.update(choices=names, value=(names[0] if names else None)) def run_search(country_name, state_name, categories): start = time.time() log_msgs = [] try: idx = COUNTRY_NAMES.index(country_name) country_code = COUNTRY_CODES[idx] except ValueError: return "Invalid country", None, None, None bbox = geocode_region(state_name if state_name else country_name, country_code) if bbox is None: return f"Could not geocode region '{state_name}'.", None, None, None if not categories: return "Please select at least one category.", None, None, None df = fetch_places(categories, bbox) log_msgs.append(f"ℹ️ OSM returned {len(df)} places.") # Try Google API enrichment if GOOGLE_API_KEY and GOOGLE_API_KEY != "AIzaSyD9W7W7nYKRYbtwPm20uVyVr_aW18Y4uiE": df = enrich_with_google_api(df, log_msgs) # Fallback scraper if still missing phones df = enrich_with_scraper(df, log_msgs) # 🔑 Keep only businesses with phone numbers df = df[df["phone"].notna() & (df["phone"].str.strip() != "")] df = df.reset_index(drop=True) if df.empty: return "No businesses with phone numbers found.", None, None, None # Add WhatsApp links df["WhatsApp"] = df["phone"].apply(lambda x: f"https://wa.me/{clean_phone_for_whatsapp(x)}" if clean_phone_for_whatsapp(x) else "") # Keep lat/lon for map rendering df_for_map = df.copy() # Drop lat/lon/osm_id for display table & CSV df_display = df.drop(columns=["lat", "lon", "osm_id"], errors="ignore") df_display = df_display[["name", "amenity", "phone", "website", "WhatsApp"]] # For display table, convert to clickable link df_display["WhatsApp"] = df_display["WhatsApp"].apply( lambda x: f'💬 WhatsApp' if x else "" ) map_html = make_map(df_for_map, center_bbox=bbox) csv_bytes = df_to_csv_bytes(df_display) elapsed = time.time() - start log_msgs.append(f"⏱️ Took {elapsed:.1f}s total.") msg = "\n".join(log_msgs) # Download link csv_b64 = base64.b64encode(csv_bytes).decode("utf-8") csv_href = f'📥 Download CSV' return msg, df_display, map_html, csv_href # ------------------------------------------------------------------- # Build Gradio UI # ------------------------------------------------------------------- place_options = ["cafe", "motel", "hotel", "restaurant", "bar", "pub", "bakery", "fast_food", "guest_house", "hostel"] with gr.Blocks() as demo: gr.Markdown("# 🌍 Hybrid Client Finder (OSM + Google API + Scraper Fallback)") with gr.Row(): with gr.Column(scale=1): country = gr.Dropdown(choices=COUNTRY_NAMES, value="United States", label="Country") us_names, _ = list_subdivisions("US") state = gr.Dropdown(choices=us_names, value=(us_names[0] if us_names else None), label="State / Subdivision") update_btn = gr.Button("Refresh states") categories = gr.CheckboxGroup(place_options, label="Categories", value=["cafe", "restaurant"]) search_btn = gr.Button("Search") info = gr.Textbox(label="Status", interactive=False, lines=5) download = gr.HTML(label="Download CSV") with gr.Column(scale=2): map_html_out = gr.HTML(label="Map") table_out = gr.Dataframe(label="Results Table") update_btn.click(fn=update_states, inputs=country, outputs=state) search_btn.click(fn=run_search, inputs=[country, state, categories], outputs=[info, table_out, map_html_out, download]) if __name__ == "__main__": demo.launch()