forever-sheikh's picture
Update app.py
64c7e5b verified
import gradio as gr
import googlemaps
import osmnx as ox
import geopandas as gpd
import pandas as pd
import requests
import zipfile
import os
import glob
import shutil
import time
from shapely.geometry import Point, Polygon
from shapely.ops import transform
# ==========================================
# AUTHENTICATION
# ==========================================
try:
API_KEY = os.environ.get("GOOGLE_API_KEY")
except:
API_KEY = None
# ==========================================
# 1. UNIVERSAL FILTER LISTS (FINAL COMPLETE VERSION)
# ==========================================
# Filters out Schools, Doctors, Parking Lots, Repairs, etc.
UNIVERSAL_BAD_TERMS = [
# Health / Medical
"dr.", "dds", "md", "phd", "lcsw", "medical", "clinic", "health", "rehab",
"therapy", "counseling", "chiropractor", "dental", "orthodontics", "hospital",
"ambulance", "transport", "emergency", "veterinary", "vision center",
"spinal cord", "urgent care", "hellomed", "spine", "program",
# Education
"school", "university", "college", "academy", "campus", "library", "learning",
"student", "alum", "education", "institute", "dorm", "residence",
# Services / Misc
"atm", "kiosk", "redbox", "coinme", "fuel", "gas", "repair", "service",
"collision", "towing", "plumbing", "hvac", "electric", "tree", "lawn",
"gutter", "cleaning", "storage", "warehouse", "distribution", "mural", "statue",
"part", "accessories", "hair", "salon", "studio", "barber", "spa", "nail",
"diamonds", "jewelers", "pllc", "llc", "parking", "drive", "cooling", "heating",
"brandy", "bike shop", "grooming"
]
# Filters out departments inside Big Box stores (Fixes Area Splitting)
DEPARTMENT_TERMS = [
"grocery", "deli", "bakery", "pharmacy", "optical", "hearing", "photo",
"portrait", "garden", "nursery", "mobile", "tech", "geek", "pickup",
"money", "bank", "cafe", "bistro", "snack", "food court", "customer service",
"floral", "flowers", "store on", "tire", "battery", "auto", "lube",
"credit union", "sephora", "sunglass", "finish line", "pro desk",
"rental center", "svc drive", "inside", "at ",
"dog training"
]
# ==========================================
# 2. COMPREHENSIVE STORE LIST
# ==========================================
SEARCH_LIST = [
# Big Box / Dept
"Walmart", "Target", "Kmart", "Sears", "Kohl's", "Macy's", "JCPenney",
"Nordstrom", "Costco", "Sam's Club", "BJ's Wholesale Club",
# Clothing / Discount
"TJX", "T.J. Maxx", "Marshalls", "HomeGoods", "HomeSense",
"Ross Dress for Less", "Burlington", "Old Navy", "DSW Designer Shoe Warehouse",
# Home Imp / Hardware / Furniture
"Home Depot", "Lowe's", "Ace Hardware", "Menards",
"IKEA", "Bob's Discount Furniture", "Raymour & Flanigan", "Ashley Furniture",
# Electronics / Office
"Best Buy", "Office Depot", "OfficeMax", "Staples",
# Grocery
"Kroger", "Meijer", "Whole Foods", "Trader Joe's", "Aldi", "Lidl",
"Safeway", "Albertsons", "ShopRite", "Stop & Shop", "Publix", "Wegmans",
# Hobbies / Pets / Sporting
"Dick's Sporting Goods", "Bass Pro Shops", "Cabela's", "REI",
"Michaels", "Hobby Lobby", "Barnes & Noble",
"PetSmart", "Petco"
]
BRAND_FLOORS = {
"Macy's": 2, "JCPenney": 2, "Nordstrom": 2, "Sears": 2, "IKEA": 2,
"Target": 1, "Walmart": 1, "Costco": 1, "Home Depot": 1, "Lowe's": 1,
"Barnes & Noble": 1, "Dick's Sporting Goods": 1, "Kohl's": 1
}
BRAND_AVG_AREA = {
"IKEA": 28000, "Walmart": 15000, "Costco": 14000, "Sam's Club": 13000,
"Meijer": 18000, "Target": 12000, "Home Depot": 10000, "Lowe's": 10000,
"Kroger": 6000, "Safeway": 5000, "Whole Foods": 4000, "Macy's": 16000,
"JCPenney": 10000, "Sears": 12000, "Kohl's": 8000, "Dick's Sporting Goods": 4500,
"T.J. Maxx": 2800, "Marshalls": 2800, "Ross Dress for Less": 2800,
"Old Navy": 1400, "Barnes & Noble": 2500, "Best Buy": 3500, "Staples": 2000,
"Office Depot": 2000, "PetSmart": 1800, "Petco": 1400, "Trader Joe's": 1200,
"Aldi": 1500, "Lidl": 1500, "Ace Hardware": 800, "DSW Designer Shoe Warehouse": 2000,
"Hobby Lobby": 5000, "BJ's Wholesale Club": 10000, "REI": 4000
}
# ==========================================
# HELPER FUNCTIONS
# ==========================================
def load_geodata_to_polygon(file_obj):
extract_path = "temp_extract"
if os.path.exists(extract_path):
shutil.rmtree(extract_path)
os.makedirs(extract_path)
target_kml = None
try:
# HANDLING KML AND KMZ HERE
if file_obj.name.lower().endswith('.kmz'):
with zipfile.ZipFile(file_obj.name, 'r') as zip_ref:
zip_ref.extractall(extract_path)
kml_files = glob.glob(extract_path + "/**/*.kml", recursive=True)
if kml_files:
target_kml = kml_files[0]
elif file_obj.name.lower().endswith('.kml'):
target_kml = file_obj.name
if target_kml:
gdf = gpd.read_file(target_kml)
# FORCE 2D FIX (Prevents crashes on 3D KMLs)
def force_2d(geometry):
if geometry.has_z:
return transform(lambda x, y, z=None: (x, y), geometry)
return geometry
gdf.geometry = gdf.geometry.apply(force_2d)
return gdf.union_all()
except:
return None
return None
def get_roof_area(lat, lng, api_key):
base_url = "https://solar.googleapis.com/v1/buildingInsights:findClosest"
params = {"location.latitude": lat, "location.longitude": lng, "requiredQuality": "HIGH", "key": api_key}
try:
resp = requests.get(base_url, params=params)
data = resp.json()
if 'error' in data: return None
return data.get('solarPotential', {}).get('wholeRoofStats', {}).get('areaMeters2', None)
except:
return None
def get_osm_physics(lat, lng):
try:
tags = {'building': True}
gdf = ox.features.features_from_point((lat, lng), tags, dist=60)
if not gdf.empty:
gdf_proj = gdf.to_crs(epsg=3857)
gdf_proj['area_m2'] = gdf_proj.geometry.area
best = gdf_proj.sort_values(by='area_m2', ascending=False).iloc[0]
floors = None
if 'building:levels' in best and pd.notna(best['building:levels']):
try: floors = int(float(str(best['building:levels']).split(';')[0]))
except: pass
height = None
if 'height' in best and pd.notna(best['height']):
try: height = float(str(best['height']).replace('m','').strip())
except: pass
return height, floors
except:
pass
return None, None
# ==========================================
# MAIN LOGIC
# ==========================================
def process_data(file_obj):
if not API_KEY:
yield "❌ API Key not found! Set GOOGLE_API_KEY in Secrets.", None
return
if file_obj is None:
yield "❌ Please upload a file.", None
return
yield "📂 Loading Polygon...", None
polygon = load_geodata_to_polygon(file_obj)
if not polygon:
yield "❌ Failed to read KML/KMZ file.", None
return
# --- CHECK AREA LIMIT HERE (250,000,000 sq m) ---
gs = gpd.GeoSeries([polygon], crs="EPSG:4326")
gs_proj = gs.to_crs(epsg=6933)
area_sq_meters = gs_proj.area.iloc[0]
if area_sq_meters > 250_000_000:
yield f"⚠️ AREA TOO LARGE: {area_sq_meters:,.0f} sq m. (Limit: 250M).", None
return
gmaps = googlemaps.Client(key=API_KEY)
results = []
seen_ids = set()
total_brands = len(SEARCH_LIST)
# 1. SEARCH LOOP
for i, brand in enumerate(SEARCH_LIST):
yield f"🔍 Scanning Brand {i+1}/{total_brands}: {brand}...", None
try:
places = gmaps.places_nearby(
location=(polygon.centroid.y, polygon.centroid.x),
radius=10000,
keyword=brand
)
all_results = places.get('results', [])
while 'next_page_token' in places:
time.sleep(2)
places = gmaps.places_nearby(
location=(polygon.centroid.y, polygon.centroid.x),
radius=10000,
keyword=brand,
page_token=places['next_page_token']
)
all_results.extend(places.get('results', []))
for p in all_results:
pid = p.get('place_id')
if pid in seen_ids: continue
name = p.get('name')
name_clean = name.lower().replace("'", "").replace(".", "")
brand_clean = brand.lower().replace("'", "").replace(".", "")
# A. UNIVERSAL NAME CHECK
if brand_clean not in name_clean:
if brand_clean == "tjx" and "t.j. maxx" in name_clean: pass
elif brand_clean == "lowe" and "lowe's" in name_clean: pass
else: continue
# B. UNIVERSAL BAD WORD FILTER (Strict)
if any(term in name_clean for term in UNIVERSAL_BAD_TERMS): continue
lat = p['geometry']['location']['lat']
lng = p['geometry']['location']['lng']
# C. STRICT CONTAINMENT
if not polygon.contains(Point(lng, lat)): continue
seen_ids.add(pid)
# FETCH DATA
roof_area = get_roof_area(lat, lng, API_KEY)
height, floors = get_osm_physics(lat, lng)
# DATA FILLING
source_note = "SolarAPI"
if roof_area is None:
roof_area = BRAND_AVG_AREA.get(brand, 2500.0)
source_note = "Brand_Avg (Missing)"
else:
# Universal Mall Logic
if roof_area > 30000 and brand not in ["IKEA", "Costco", "Meijer", "Sam's Club", "Walmart"]:
roof_area = BRAND_AVG_AREA.get(brand, 2500.0)
source_note = "Brand_Avg (Mall detected)"
elif roof_area < 500 and brand not in ["Ace Hardware", "Trader Joe's"]:
roof_area = BRAND_AVG_AREA.get(brand, 2500.0)
source_note = "Brand_Avg (Too Small detected)"
if floors is None: floors = BRAND_FLOORS.get(brand, 1)
if height is None: height = floors * 6.0
results.append({
'Name': name,
'Brand': brand,
'Latitude': lat,
'Longitude': lng,
'Height_m': round(height, 2),
'Num_Floors': int(floors),
'Area_sqm': round(roof_area, 2),
'Data_Source': source_note
})
except:
pass
if not results:
yield "❌ No stores found in this area.", None
return
# ==========================================
# 2. UNIVERSAL POST-PROCESSING
# ==========================================
yield "🧹 Performing Universal Deduplication...", None
df = pd.DataFrame(results)
# A. Remove Departments (Target Grocery, Meijer Deli, Kroger Floral)
df = df[~df['Name'].str.contains('|'.join(DEPARTMENT_TERMS), case=False, na=False)]
# B. Spatial Deduplication (Group by Brand + Location)
# Creates a grid ID approx 11 meters.
df['Loc_ID'] = df['Latitude'].round(4).astype(str) + "_" + df['Longitude'].round(4).astype(str)
# Sort by Name Length (Shortest name usually "Target", longest usually "Target Grocery ...")
df['Name_Len'] = df['Name'].str.len()
df = df.sort_values(by=['Brand', 'Loc_ID', 'Name_Len'])
# Drop duplicates, keeping the shortest name
df = df.drop_duplicates(subset=['Brand', 'Loc_ID'], keep='first')
df = df.drop(columns=['Loc_ID', 'Name_Len'])
# C. Universal Strip Mall Splitter
df['Tenant_Count'] = df.groupby('Area_sqm')['Area_sqm'].transform('count')
df['Final_Area_sqm'] = df.apply(
lambda x: x['Area_sqm'] / x['Tenant_Count'] if x['Tenant_Count'] > 1 and x['Area_sqm'] > 5000 else x['Area_sqm'],
axis=1
)
df['Data_Source'] = df.apply(
lambda x: x['Data_Source'] + f" (Split w/ {x['Tenant_Count']-1} tenants)" if x['Tenant_Count'] > 1 and x['Area_sqm'] > 5000 else x['Data_Source'],
axis=1
)
# Clean Export
final_cols = ['Name', 'Brand', 'Latitude', 'Longitude', 'Height_m', 'Num_Floors', 'Final_Area_sqm', 'Data_Source']
df_final = df[final_cols].rename(columns={'Final_Area_sqm': 'Area_sqm'})
output_path = "Universal_Building_Inventory.csv"
df_final.to_csv(output_path, index=False)
yield f"✅ Success! Found {len(df_final)} unique commercial assets.", output_path
# ==========================================
# GRADIO INTERFACE
# ==========================================
iface = gr.Interface(
fn=process_data,
inputs=gr.File(label="Upload Polygon (KML/KMZ) - - Limit 250, Sq KM area"),
outputs=[
gr.Textbox(label="Status Log"),
gr.File(label="Download CSV")
],
title="🌎 Universal Commercial Building Scanner - Test Phase - For Nokia",
description="Upload any KML/KMZ. Scans for 50+ Big Box Brands, get Area Height/floors. Using Places API, Solar API and OpenStreetMaps API"
)
iface.launch()