|
|
import gradio as gr |
|
|
import pycountry |
|
|
import pandas as pd |
|
|
import folium |
|
|
from folium.plugins import MarkerCluster |
|
|
import base64 |
|
|
import time |
|
|
import overpy |
|
|
from geopy.geocoders import Nominatim |
|
|
import requests |
|
|
from bs4 import BeautifulSoup |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
geolocator = Nominatim(user_agent="hf-saas-dashboard") |
|
|
api = overpy.Overpass() |
|
|
GOOGLE_API_KEY = "AIzaSyD9W7W7nYKRYbtwPm20uVyVr_aW18Y4uiE" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def list_countries(): |
|
|
items = sorted([(c.name, c.alpha_2) for c in pycountry.countries], key=lambda x: x[0]) |
|
|
names = [name for name, _ in items] |
|
|
codes = [code for _, code in items] |
|
|
return names, codes |
|
|
|
|
|
|
|
|
def list_subdivisions(country_code): |
|
|
subs = list(pycountry.subdivisions.get(country_code=country_code)) |
|
|
if not subs: |
|
|
return [], [] |
|
|
items = sorted([(s.name, s.code) for s in subs], key=lambda x: x[0]) |
|
|
names = [n for n, _ in items] |
|
|
codes = [c for _, c in items] |
|
|
return names, codes |
|
|
|
|
|
|
|
|
def geocode_region(name, country_code): |
|
|
"""Use geopy + Nominatim to geocode a region and return its bounding box.""" |
|
|
query = f"{name}, {country_code}" |
|
|
location = geolocator.geocode(query, exactly_one=True, addressdetails=False) |
|
|
if not location: |
|
|
return None |
|
|
if not location.raw.get("boundingbox"): |
|
|
return None |
|
|
bbox = [float(x) for x in location.raw["boundingbox"]] |
|
|
|
|
|
return (bbox[0], bbox[2], bbox[1], bbox[3]) |
|
|
|
|
|
|
|
|
def fetch_places(amenities, bbox): |
|
|
"""Fetch places with given amenities inside a bounding box using overpy.""" |
|
|
south, west, north, east = bbox |
|
|
amen_regex = "|".join(amenities) |
|
|
|
|
|
query = f""" |
|
|
( |
|
|
node["amenity"~"^{amen_regex}$"]({south},{west},{north},{east}); |
|
|
way["amenity"~"^{amen_regex}$"]({south},{west},{north},{east}); |
|
|
relation["amenity"~"^{amen_regex}$"]({south},{west},{north},{east}); |
|
|
); |
|
|
out center tags; |
|
|
""" |
|
|
|
|
|
result = api.query(query) |
|
|
rows = [] |
|
|
|
|
|
def parse_tags(tags, lat, lon, osm_id): |
|
|
return { |
|
|
"name": tags.get("name", ""), |
|
|
"amenity": tags.get("amenity", ""), |
|
|
"lat": lat, |
|
|
"lon": lon, |
|
|
"phone": tags.get("phone", ""), |
|
|
"website": tags.get("website", ""), |
|
|
"osm_id": osm_id, |
|
|
} |
|
|
|
|
|
for node in result.nodes: |
|
|
rows.append(parse_tags(node.tags, node.lat, node.lon, f"node/{node.id}")) |
|
|
|
|
|
for way in result.ways: |
|
|
rows.append(parse_tags(way.tags, getattr(way, "center_lat", None), getattr(way, "center_lon", None), f"way/{way.id}")) |
|
|
|
|
|
for rel in result.relations: |
|
|
rows.append(parse_tags(rel.tags, getattr(rel, "center_lat", None), getattr(rel, "center_lon", None), f"relation/{rel.id}")) |
|
|
|
|
|
return pd.DataFrame(rows).dropna(subset=["lat", "lon"]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def enrich_with_google_api(df, log_msgs): |
|
|
"""Enrich OSM results with Google Places API (phone + website).""" |
|
|
used = 0 |
|
|
for i, row in df.iterrows(): |
|
|
if row["phone"] and row["website"]: |
|
|
continue |
|
|
|
|
|
query = f"{row['name']} near {row['lat']},{row['lon']}" |
|
|
url = "https://maps.googleapis.com/maps/api/place/textsearch/json" |
|
|
params = {"query": query, "key": GOOGLE_API_KEY} |
|
|
r = requests.get(url, params=params) |
|
|
data = r.json() |
|
|
|
|
|
if data.get("results"): |
|
|
place_id = data["results"][0]["place_id"] |
|
|
|
|
|
|
|
|
details_url = "https://maps.googleapis.com/maps/api/place/details/json" |
|
|
d_params = {"place_id": place_id, "fields": "formatted_phone_number,website", "key": GOOGLE_API_KEY} |
|
|
d = requests.get(details_url, params=d_params).json() |
|
|
|
|
|
details = d.get("result", {}) |
|
|
df.at[i, "phone"] = details.get("formatted_phone_number", row["phone"]) |
|
|
df.at[i, "website"] = details.get("website", row["website"]) |
|
|
used += 1 |
|
|
|
|
|
if used > 0: |
|
|
log_msgs.append(f"β
Google API enriched {used} rows.") |
|
|
else: |
|
|
log_msgs.append("β οΈ Google API did not enrich any rows.") |
|
|
return df |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def scrape_google_maps(name, lat, lon): |
|
|
"""Scrape Google Maps web UI for phone number (last resort, experimental).""" |
|
|
search_url = f"https://www.google.com/maps/search/{name}/@{lat},{lon},15z" |
|
|
headers = {"User-Agent": "Mozilla/5.0"} |
|
|
r = requests.get(search_url, headers=headers) |
|
|
|
|
|
if r.status_code != 200: |
|
|
return None |
|
|
|
|
|
soup = BeautifulSoup(r.text, "html.parser") |
|
|
phone = None |
|
|
for span in soup.find_all("span"): |
|
|
if "+" in span.text and any(c.isdigit() for c in span.text): |
|
|
phone = span.text.strip() |
|
|
break |
|
|
return phone |
|
|
|
|
|
|
|
|
def enrich_with_scraper(df, log_msgs): |
|
|
used = 0 |
|
|
for i, row in df.iterrows(): |
|
|
if row["phone"]: |
|
|
continue |
|
|
phone = scrape_google_maps(row["name"], row["lat"], row["lon"]) |
|
|
if phone: |
|
|
df.at[i, "phone"] = phone |
|
|
used += 1 |
|
|
if used > 0: |
|
|
log_msgs.append(f"β
Scraper fallback enriched {used} rows.") |
|
|
else: |
|
|
log_msgs.append("β οΈ Scraper fallback did not find more phones.") |
|
|
return df |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def clean_phone_for_whatsapp(phone): |
|
|
"""Convert phone number into WhatsApp-friendly format (digits only, keep +).""" |
|
|
if not phone: |
|
|
return None |
|
|
cleaned = "".join(c for c in phone if c.isdigit() or c == "+") |
|
|
return cleaned if cleaned else None |
|
|
|
|
|
|
|
|
def df_to_csv_bytes(df): |
|
|
return df.to_csv(index=False).encode("utf-8") |
|
|
|
|
|
|
|
|
def make_map(df, center_bbox=None): |
|
|
if df.empty: |
|
|
return folium.Map(location=[20, 0], zoom_start=2)._repr_html_() |
|
|
|
|
|
if center_bbox: |
|
|
south, west, north, east = center_bbox |
|
|
center_lat = (south + north) / 2 |
|
|
center_lon = (west + east) / 2 |
|
|
else: |
|
|
center_lat = df["lat"].mean() |
|
|
center_lon = df["lon"].mean() |
|
|
|
|
|
m = folium.Map(location=[center_lat, center_lon], zoom_start=8) |
|
|
cluster = MarkerCluster().add_to(m) |
|
|
|
|
|
for _, row in df.iterrows(): |
|
|
wa_link = f"https://wa.me/{clean_phone_for_whatsapp(row['phone'])}" if row['phone'] else None |
|
|
popup_html = f""" |
|
|
<b>{row['name'] or 'Unnamed Place'}</b><br> |
|
|
π {row['phone'] if row['phone'] else 'N/A'}<br> |
|
|
π <a href="{row['website']}" target="_blank">{row['website'] or 'N/A'}</a><br> |
|
|
π΄ {row['amenity']}<br> |
|
|
{f'<a href="{wa_link}" target="_blank">π¬ WhatsApp</a>' if wa_link else ''} |
|
|
""" |
|
|
folium.Marker( |
|
|
[row["lat"], row["lon"]], |
|
|
popup=folium.Popup(popup_html, max_width=300), |
|
|
tooltip=row["name"] if row["name"] else row["amenity"] |
|
|
).add_to(cluster) |
|
|
|
|
|
return m._repr_html_() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
COUNTRY_NAMES, COUNTRY_CODES = list_countries() |
|
|
|
|
|
def update_states(selected_country_name): |
|
|
try: |
|
|
idx = COUNTRY_NAMES.index(selected_country_name) |
|
|
code = COUNTRY_CODES[idx] |
|
|
except ValueError: |
|
|
return gr.update(choices=[]) |
|
|
names, _ = list_subdivisions(code) |
|
|
return gr.update(choices=names, value=(names[0] if names else None)) |
|
|
|
|
|
|
|
|
def run_search(country_name, state_name, categories): |
|
|
start = time.time() |
|
|
log_msgs = [] |
|
|
try: |
|
|
idx = COUNTRY_NAMES.index(country_name) |
|
|
country_code = COUNTRY_CODES[idx] |
|
|
except ValueError: |
|
|
return "Invalid country", None, None, None |
|
|
|
|
|
bbox = geocode_region(state_name if state_name else country_name, country_code) |
|
|
if bbox is None: |
|
|
return f"Could not geocode region '{state_name}'.", None, None, None |
|
|
|
|
|
if not categories: |
|
|
return "Please select at least one category.", None, None, None |
|
|
|
|
|
df = fetch_places(categories, bbox) |
|
|
log_msgs.append(f"βΉοΈ OSM returned {len(df)} places.") |
|
|
|
|
|
|
|
|
if GOOGLE_API_KEY and GOOGLE_API_KEY != "AIzaSyD9W7W7nYKRYbtwPm20uVyVr_aW18Y4uiE": |
|
|
df = enrich_with_google_api(df, log_msgs) |
|
|
|
|
|
|
|
|
df = enrich_with_scraper(df, log_msgs) |
|
|
|
|
|
|
|
|
df = df[df["phone"].notna() & (df["phone"].str.strip() != "")] |
|
|
df = df.reset_index(drop=True) |
|
|
|
|
|
if df.empty: |
|
|
return "No businesses with phone numbers found.", None, None, None |
|
|
|
|
|
|
|
|
df["WhatsApp"] = df["phone"].apply(lambda x: f"https://wa.me/{clean_phone_for_whatsapp(x)}" if clean_phone_for_whatsapp(x) else "") |
|
|
|
|
|
|
|
|
df_for_map = df.copy() |
|
|
|
|
|
|
|
|
df_display = df.drop(columns=["lat", "lon", "osm_id"], errors="ignore") |
|
|
df_display = df_display[["name", "amenity", "phone", "website", "WhatsApp"]] |
|
|
|
|
|
df_display["WhatsApp"] = df_display["WhatsApp"].apply( |
|
|
lambda x: f'<a href="{x}" target="_blank">π¬ WhatsApp</a>' if x else "" |
|
|
) |
|
|
|
|
|
map_html = make_map(df_for_map, center_bbox=bbox) |
|
|
csv_bytes = df_to_csv_bytes(df_display) |
|
|
|
|
|
elapsed = time.time() - start |
|
|
log_msgs.append(f"β±οΈ Took {elapsed:.1f}s total.") |
|
|
msg = "\n".join(log_msgs) |
|
|
|
|
|
|
|
|
csv_b64 = base64.b64encode(csv_bytes).decode("utf-8") |
|
|
csv_href = f'<a href="data:text/csv;base64,{csv_b64}" download="results.csv">π₯ Download CSV</a>' |
|
|
|
|
|
return msg, df_display, map_html, csv_href |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
place_options = ["cafe", "motel", "hotel", "restaurant", "bar", "pub", "bakery", "fast_food", "guest_house", "hostel"] |
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
gr.Markdown("# π Hybrid Client Finder (OSM + Google API + Scraper Fallback)") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
country = gr.Dropdown(choices=COUNTRY_NAMES, value="United States", label="Country") |
|
|
us_names, _ = list_subdivisions("US") |
|
|
state = gr.Dropdown(choices=us_names, value=(us_names[0] if us_names else None), label="State / Subdivision") |
|
|
|
|
|
update_btn = gr.Button("Refresh states") |
|
|
categories = gr.CheckboxGroup(place_options, label="Categories", value=["cafe", "restaurant"]) |
|
|
search_btn = gr.Button("Search") |
|
|
|
|
|
info = gr.Textbox(label="Status", interactive=False, lines=5) |
|
|
download = gr.HTML(label="Download CSV") |
|
|
|
|
|
with gr.Column(scale=2): |
|
|
map_html_out = gr.HTML(label="Map") |
|
|
table_out = gr.Dataframe(label="Results Table") |
|
|
|
|
|
update_btn.click(fn=update_states, inputs=country, outputs=state) |
|
|
search_btn.click(fn=run_search, inputs=[country, state, categories], outputs=[info, table_out, map_html_out, download]) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |
|
|
|