Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import googlemaps | |
| import osmnx as ox | |
| import geopandas as gpd | |
| import pandas as pd | |
| import requests | |
| import zipfile | |
| import os | |
| import glob | |
| import shutil | |
| import time | |
| from shapely.geometry import Point, Polygon | |
| from shapely.ops import transform | |
| # ========================================== | |
| # AUTHENTICATION | |
| # ========================================== | |
| try: | |
| API_KEY = os.environ.get("GOOGLE_API_KEY") | |
| except: | |
| API_KEY = None | |
| # ========================================== | |
| # 1. UNIVERSAL FILTER LISTS (FINAL COMPLETE VERSION) | |
| # ========================================== | |
| # Filters out Schools, Doctors, Parking Lots, Repairs, etc. | |
| UNIVERSAL_BAD_TERMS = [ | |
| # Health / Medical | |
| "dr.", "dds", "md", "phd", "lcsw", "medical", "clinic", "health", "rehab", | |
| "therapy", "counseling", "chiropractor", "dental", "orthodontics", "hospital", | |
| "ambulance", "transport", "emergency", "veterinary", "vision center", | |
| "spinal cord", "urgent care", "hellomed", "spine", "program", | |
| # Education | |
| "school", "university", "college", "academy", "campus", "library", "learning", | |
| "student", "alum", "education", "institute", "dorm", "residence", | |
| # Services / Misc | |
| "atm", "kiosk", "redbox", "coinme", "fuel", "gas", "repair", "service", | |
| "collision", "towing", "plumbing", "hvac", "electric", "tree", "lawn", | |
| "gutter", "cleaning", "storage", "warehouse", "distribution", "mural", "statue", | |
| "part", "accessories", "hair", "salon", "studio", "barber", "spa", "nail", | |
| "diamonds", "jewelers", "pllc", "llc", "parking", "drive", "cooling", "heating", | |
| "brandy", "bike shop", "grooming" | |
| ] | |
| # Filters out departments inside Big Box stores (Fixes Area Splitting) | |
| DEPARTMENT_TERMS = [ | |
| "grocery", "deli", "bakery", "pharmacy", "optical", "hearing", "photo", | |
| "portrait", "garden", "nursery", "mobile", "tech", "geek", "pickup", | |
| "money", "bank", "cafe", "bistro", "snack", "food court", "customer service", | |
| "floral", "flowers", "store on", "tire", "battery", "auto", "lube", | |
| "credit union", "sephora", "sunglass", "finish line", "pro desk", | |
| "rental center", "svc drive", "inside", "at ", | |
| "dog training" | |
| ] | |
| # ========================================== | |
| # 2. COMPREHENSIVE STORE LIST | |
| # ========================================== | |
| SEARCH_LIST = [ | |
| # Big Box / Dept | |
| "Walmart", "Target", "Kmart", "Sears", "Kohl's", "Macy's", "JCPenney", | |
| "Nordstrom", "Costco", "Sam's Club", "BJ's Wholesale Club", | |
| # Clothing / Discount | |
| "TJX", "T.J. Maxx", "Marshalls", "HomeGoods", "HomeSense", | |
| "Ross Dress for Less", "Burlington", "Old Navy", "DSW Designer Shoe Warehouse", | |
| # Home Imp / Hardware / Furniture | |
| "Home Depot", "Lowe's", "Ace Hardware", "Menards", | |
| "IKEA", "Bob's Discount Furniture", "Raymour & Flanigan", "Ashley Furniture", | |
| # Electronics / Office | |
| "Best Buy", "Office Depot", "OfficeMax", "Staples", | |
| # Grocery | |
| "Kroger", "Meijer", "Whole Foods", "Trader Joe's", "Aldi", "Lidl", | |
| "Safeway", "Albertsons", "ShopRite", "Stop & Shop", "Publix", "Wegmans", | |
| # Hobbies / Pets / Sporting | |
| "Dick's Sporting Goods", "Bass Pro Shops", "Cabela's", "REI", | |
| "Michaels", "Hobby Lobby", "Barnes & Noble", | |
| "PetSmart", "Petco" | |
| ] | |
| BRAND_FLOORS = { | |
| "Macy's": 2, "JCPenney": 2, "Nordstrom": 2, "Sears": 2, "IKEA": 2, | |
| "Target": 1, "Walmart": 1, "Costco": 1, "Home Depot": 1, "Lowe's": 1, | |
| "Barnes & Noble": 1, "Dick's Sporting Goods": 1, "Kohl's": 1 | |
| } | |
| BRAND_AVG_AREA = { | |
| "IKEA": 28000, "Walmart": 15000, "Costco": 14000, "Sam's Club": 13000, | |
| "Meijer": 18000, "Target": 12000, "Home Depot": 10000, "Lowe's": 10000, | |
| "Kroger": 6000, "Safeway": 5000, "Whole Foods": 4000, "Macy's": 16000, | |
| "JCPenney": 10000, "Sears": 12000, "Kohl's": 8000, "Dick's Sporting Goods": 4500, | |
| "T.J. Maxx": 2800, "Marshalls": 2800, "Ross Dress for Less": 2800, | |
| "Old Navy": 1400, "Barnes & Noble": 2500, "Best Buy": 3500, "Staples": 2000, | |
| "Office Depot": 2000, "PetSmart": 1800, "Petco": 1400, "Trader Joe's": 1200, | |
| "Aldi": 1500, "Lidl": 1500, "Ace Hardware": 800, "DSW Designer Shoe Warehouse": 2000, | |
| "Hobby Lobby": 5000, "BJ's Wholesale Club": 10000, "REI": 4000 | |
| } | |
| # ========================================== | |
| # HELPER FUNCTIONS | |
| # ========================================== | |
| def load_geodata_to_polygon(file_obj): | |
| extract_path = "temp_extract" | |
| if os.path.exists(extract_path): | |
| shutil.rmtree(extract_path) | |
| os.makedirs(extract_path) | |
| target_kml = None | |
| try: | |
| # HANDLING KML AND KMZ HERE | |
| if file_obj.name.lower().endswith('.kmz'): | |
| with zipfile.ZipFile(file_obj.name, 'r') as zip_ref: | |
| zip_ref.extractall(extract_path) | |
| kml_files = glob.glob(extract_path + "/**/*.kml", recursive=True) | |
| if kml_files: | |
| target_kml = kml_files[0] | |
| elif file_obj.name.lower().endswith('.kml'): | |
| target_kml = file_obj.name | |
| if target_kml: | |
| gdf = gpd.read_file(target_kml) | |
| # FORCE 2D FIX (Prevents crashes on 3D KMLs) | |
| def force_2d(geometry): | |
| if geometry.has_z: | |
| return transform(lambda x, y, z=None: (x, y), geometry) | |
| return geometry | |
| gdf.geometry = gdf.geometry.apply(force_2d) | |
| return gdf.union_all() | |
| except: | |
| return None | |
| return None | |
| def get_roof_area(lat, lng, api_key): | |
| base_url = "https://solar.googleapis.com/v1/buildingInsights:findClosest" | |
| params = {"location.latitude": lat, "location.longitude": lng, "requiredQuality": "HIGH", "key": api_key} | |
| try: | |
| resp = requests.get(base_url, params=params) | |
| data = resp.json() | |
| if 'error' in data: return None | |
| return data.get('solarPotential', {}).get('wholeRoofStats', {}).get('areaMeters2', None) | |
| except: | |
| return None | |
| def get_osm_physics(lat, lng): | |
| try: | |
| tags = {'building': True} | |
| gdf = ox.features.features_from_point((lat, lng), tags, dist=60) | |
| if not gdf.empty: | |
| gdf_proj = gdf.to_crs(epsg=3857) | |
| gdf_proj['area_m2'] = gdf_proj.geometry.area | |
| best = gdf_proj.sort_values(by='area_m2', ascending=False).iloc[0] | |
| floors = None | |
| if 'building:levels' in best and pd.notna(best['building:levels']): | |
| try: floors = int(float(str(best['building:levels']).split(';')[0])) | |
| except: pass | |
| height = None | |
| if 'height' in best and pd.notna(best['height']): | |
| try: height = float(str(best['height']).replace('m','').strip()) | |
| except: pass | |
| return height, floors | |
| except: | |
| pass | |
| return None, None | |
| # ========================================== | |
| # MAIN LOGIC | |
| # ========================================== | |
| def process_data(file_obj): | |
| if not API_KEY: | |
| yield "❌ API Key not found! Set GOOGLE_API_KEY in Secrets.", None | |
| return | |
| if file_obj is None: | |
| yield "❌ Please upload a file.", None | |
| return | |
| yield "📂 Loading Polygon...", None | |
| polygon = load_geodata_to_polygon(file_obj) | |
| if not polygon: | |
| yield "❌ Failed to read KML/KMZ file.", None | |
| return | |
| # --- CHECK AREA LIMIT HERE (250,000,000 sq m) --- | |
| gs = gpd.GeoSeries([polygon], crs="EPSG:4326") | |
| gs_proj = gs.to_crs(epsg=6933) | |
| area_sq_meters = gs_proj.area.iloc[0] | |
| if area_sq_meters > 250_000_000: | |
| yield f"⚠️ AREA TOO LARGE: {area_sq_meters:,.0f} sq m. (Limit: 250M).", None | |
| return | |
| gmaps = googlemaps.Client(key=API_KEY) | |
| results = [] | |
| seen_ids = set() | |
| total_brands = len(SEARCH_LIST) | |
| # 1. SEARCH LOOP | |
| for i, brand in enumerate(SEARCH_LIST): | |
| yield f"🔍 Scanning Brand {i+1}/{total_brands}: {brand}...", None | |
| try: | |
| places = gmaps.places_nearby( | |
| location=(polygon.centroid.y, polygon.centroid.x), | |
| radius=10000, | |
| keyword=brand | |
| ) | |
| all_results = places.get('results', []) | |
| while 'next_page_token' in places: | |
| time.sleep(2) | |
| places = gmaps.places_nearby( | |
| location=(polygon.centroid.y, polygon.centroid.x), | |
| radius=10000, | |
| keyword=brand, | |
| page_token=places['next_page_token'] | |
| ) | |
| all_results.extend(places.get('results', [])) | |
| for p in all_results: | |
| pid = p.get('place_id') | |
| if pid in seen_ids: continue | |
| name = p.get('name') | |
| name_clean = name.lower().replace("'", "").replace(".", "") | |
| brand_clean = brand.lower().replace("'", "").replace(".", "") | |
| # A. UNIVERSAL NAME CHECK | |
| if brand_clean not in name_clean: | |
| if brand_clean == "tjx" and "t.j. maxx" in name_clean: pass | |
| elif brand_clean == "lowe" and "lowe's" in name_clean: pass | |
| else: continue | |
| # B. UNIVERSAL BAD WORD FILTER (Strict) | |
| if any(term in name_clean for term in UNIVERSAL_BAD_TERMS): continue | |
| lat = p['geometry']['location']['lat'] | |
| lng = p['geometry']['location']['lng'] | |
| # C. STRICT CONTAINMENT | |
| if not polygon.contains(Point(lng, lat)): continue | |
| seen_ids.add(pid) | |
| # FETCH DATA | |
| roof_area = get_roof_area(lat, lng, API_KEY) | |
| height, floors = get_osm_physics(lat, lng) | |
| # DATA FILLING | |
| source_note = "SolarAPI" | |
| if roof_area is None: | |
| roof_area = BRAND_AVG_AREA.get(brand, 2500.0) | |
| source_note = "Brand_Avg (Missing)" | |
| else: | |
| # Universal Mall Logic | |
| if roof_area > 30000 and brand not in ["IKEA", "Costco", "Meijer", "Sam's Club", "Walmart"]: | |
| roof_area = BRAND_AVG_AREA.get(brand, 2500.0) | |
| source_note = "Brand_Avg (Mall detected)" | |
| elif roof_area < 500 and brand not in ["Ace Hardware", "Trader Joe's"]: | |
| roof_area = BRAND_AVG_AREA.get(brand, 2500.0) | |
| source_note = "Brand_Avg (Too Small detected)" | |
| if floors is None: floors = BRAND_FLOORS.get(brand, 1) | |
| if height is None: height = floors * 6.0 | |
| results.append({ | |
| 'Name': name, | |
| 'Brand': brand, | |
| 'Latitude': lat, | |
| 'Longitude': lng, | |
| 'Height_m': round(height, 2), | |
| 'Num_Floors': int(floors), | |
| 'Area_sqm': round(roof_area, 2), | |
| 'Data_Source': source_note | |
| }) | |
| except: | |
| pass | |
| if not results: | |
| yield "❌ No stores found in this area.", None | |
| return | |
| # ========================================== | |
| # 2. UNIVERSAL POST-PROCESSING | |
| # ========================================== | |
| yield "🧹 Performing Universal Deduplication...", None | |
| df = pd.DataFrame(results) | |
| # A. Remove Departments (Target Grocery, Meijer Deli, Kroger Floral) | |
| df = df[~df['Name'].str.contains('|'.join(DEPARTMENT_TERMS), case=False, na=False)] | |
| # B. Spatial Deduplication (Group by Brand + Location) | |
| # Creates a grid ID approx 11 meters. | |
| df['Loc_ID'] = df['Latitude'].round(4).astype(str) + "_" + df['Longitude'].round(4).astype(str) | |
| # Sort by Name Length (Shortest name usually "Target", longest usually "Target Grocery ...") | |
| df['Name_Len'] = df['Name'].str.len() | |
| df = df.sort_values(by=['Brand', 'Loc_ID', 'Name_Len']) | |
| # Drop duplicates, keeping the shortest name | |
| df = df.drop_duplicates(subset=['Brand', 'Loc_ID'], keep='first') | |
| df = df.drop(columns=['Loc_ID', 'Name_Len']) | |
| # C. Universal Strip Mall Splitter | |
| df['Tenant_Count'] = df.groupby('Area_sqm')['Area_sqm'].transform('count') | |
| df['Final_Area_sqm'] = df.apply( | |
| lambda x: x['Area_sqm'] / x['Tenant_Count'] if x['Tenant_Count'] > 1 and x['Area_sqm'] > 5000 else x['Area_sqm'], | |
| axis=1 | |
| ) | |
| df['Data_Source'] = df.apply( | |
| lambda x: x['Data_Source'] + f" (Split w/ {x['Tenant_Count']-1} tenants)" if x['Tenant_Count'] > 1 and x['Area_sqm'] > 5000 else x['Data_Source'], | |
| axis=1 | |
| ) | |
| # Clean Export | |
| final_cols = ['Name', 'Brand', 'Latitude', 'Longitude', 'Height_m', 'Num_Floors', 'Final_Area_sqm', 'Data_Source'] | |
| df_final = df[final_cols].rename(columns={'Final_Area_sqm': 'Area_sqm'}) | |
| output_path = "Universal_Building_Inventory.csv" | |
| df_final.to_csv(output_path, index=False) | |
| yield f"✅ Success! Found {len(df_final)} unique commercial assets.", output_path | |
| # ========================================== | |
| # GRADIO INTERFACE | |
| # ========================================== | |
| iface = gr.Interface( | |
| fn=process_data, | |
| inputs=gr.File(label="Upload Polygon (KML/KMZ) - - Limit 250, Sq KM area"), | |
| outputs=[ | |
| gr.Textbox(label="Status Log"), | |
| gr.File(label="Download CSV") | |
| ], | |
| title="🌎 Universal Commercial Building Scanner - Test Phase - For Nokia", | |
| description="Upload any KML/KMZ. Scans for 50+ Big Box Brands, get Area Height/floors. Using Places API, Solar API and OpenStreetMaps API" | |
| ) | |
| iface.launch() |