# -*- coding: utf-8 -*- """Updated_proto_KDE_saving.ipynb Automatically generated by Colab. Original file is located at https://colab.research.google.com/drive/1FaE0wh8yJYv3lxVbhyN4r9eHUNBHWAOX """ # ============================================================================== # CELL 1: SETUP AND CONSOLIDATED IMPORTS # ============================================================================== import gradio as gr import os import json import uuid import shutil import zipfile import pathlib import tempfile import pandas as pd import PIL.Image from datetime import datetime import huggingface_hub import autogluon.multimodal import numpy as np import matplotlib.pyplot as plt import matplotlib.cm as cm import matplotlib.colors import folium from scipy.stats import gaussian_kde from datasets import load_dataset from geopy.geocoders import Nominatim from geopy.extra.rate_limiter import RateLimiter # ============================================================================== # CELL 2: CORE LOGIC FOR TAB 1 (UNCHANGED) # ============================================================================== # --- Functions for Data Capture --- def get_current_time(): return datetime.now().isoformat() def handle_time_capture(): timestamp = get_current_time() status_msg = f"🕐 **Time Captured**: {timestamp}" return status_msg, timestamp def get_gps_js(): return """ () => { if (!navigator.geolocation) { alert("Geolocation not supported"); return; } navigator.geolocation.getCurrentPosition( function(position) { const latBox = document.querySelector('#lat textarea'); const lonBox = document.querySelector('#lon textarea'); const accuracyBox = document.querySelector('#accuracy textarea'); const timestampBox = document.querySelector('#device_ts textarea'); if (latBox && lonBox && accuracyBox && timestampBox) { latBox.value = position.coords.latitude.toString(); lonBox.value = position.coords.longitude.toString(); accuracyBox.value = position.coords.accuracy.toString(); timestampBox.value = new Date().toISOString(); latBox.dispatchEvent(new Event('input', { bubbles: true })); lonBox.dispatchEvent(new Event('input', { bubbles: true })); accuracyBox.dispatchEvent(new Event('input', { bubbles: true })); timestampBox.dispatchEvent(new Event('input', { bubbles: true })); } else { alert("Error: Could not find GPS input fields"); } }, function(err) { alert("GPS Error: " + err.message); }, { enableHighAccuracy: true, timeout: 10000 } ); } """ def save_to_dataset(image, lat, lon, accuracy_m, device_ts): if image is None: return "❌ **Error**: Please capture or upload a photo first.", "" mock_data = { "image": "image.jpg", "latitude": lat, "longitude": lon, "accuracy_m": accuracy_m, "device_timestamp": device_ts, "status": "Saving Disabled" } status = "✅ **Test Save Successful!** (No data saved)" return status, json.dumps(mock_data, indent=2) placeholder_time_capture = handle_time_capture placeholder_save_action = save_to_dataset # --- Functions for Model Prediction --- MODEL_REPO_ID = "ddecosmo/lanternfly_classifier" ZIP_FILENAME = "autogluon_image_predictor_dir.zip" CLASS_LABELS = {0: "Lanternfly", 1: "Other Insect", 2: "No Insect"} CACHE_DIR = pathlib.Path("hf_assets") EXTRACT_DIR = CACHE_DIR / "predictor_native" PREDICTOR = None def _prepare_predictor_dir(): CACHE_DIR.mkdir(parents=True, exist_ok=True) token = os.getenv("HF_TOKEN", None) local_zip = huggingface_hub.hf_hub_download( repo_id=MODEL_REPO_ID, filename=ZIP_FILENAME, repo_type="model", token=token, local_dir=str(CACHE_DIR), local_dir_use_symlinks=False, ) if EXTRACT_DIR.exists(): shutil.rmtree(EXTRACT_DIR) EXTRACT_DIR.mkdir(parents=True, exist_ok=True) with zipfile.ZipFile(local_zip, "r") as zf: zf.extractall(str(EXTRACT_DIR)) contents = list(EXTRACT_DIR.iterdir()) return str(contents[0]) if (len(contents) == 1 and contents[0].is_dir()) else str(EXTRACT_DIR) try: PREDICTOR_DIR = _prepare_predictor_dir() PREDICTOR = autogluon.multimodal.MultiModalPredictor.load(PREDICTOR_DIR) PREDICTOR_LOAD_STATUS = "✅ AutoGluon Predictor loaded successfully." print(PREDICTOR_LOAD_STATUS) except Exception as e: PREDICTOR_LOAD_STATUS = f"❌ Failed to load AutoGluon Predictor: {e}" print(PREDICTOR_LOAD_STATUS) PREDICTOR = None def do_predict(pil_img: PIL.Image.Image): if PREDICTOR is None: return {"Error": 1.0}, "Model not loaded.", "" if pil_img is None: return {"No Image": 1.0}, "No image provided.", "" tmpdir = pathlib.Path(tempfile.mkdtemp()) img_path = tmpdir / "input.png" pil_img.save(img_path) df = pd.DataFrame({"image": [str(img_path)]}) proba_df = PREDICTOR.predict_proba(df).rename(columns=CLASS_LABELS) row = proba_df.iloc[0] pretty_dict = {label: float(row.get(label, 0.0)) for label in CLASS_LABELS.values()} confidence_info = ", ".join([f"{label}: {prob:.2f}" for label, prob in pretty_dict.items()]) return pretty_dict, confidence_info # ============================================================================== # CELL 3: CORE LOGIC FOR TAB 2 (KDE ANALYSIS) # ============================================================================== pittsburgh_lat_min = 40.43950159029883 pittsburgh_lat_max = 40.44787067820301 pittsburgh_lon_min = -79.95054304624013 pittsburgh_lon_max = -79.93588847945053 def load_dataframe_from_huggingface(): try: print("Loading data directly from Hugging Face dataset...") dataset = load_dataset("rlogh/lanternfly-data", data_files="metadata/entries.jsonl", split="train") df = dataset.to_pandas() print("✅ Data successfully loaded into a DataFrame.") return df except Exception as e: print(f"❌ Error loading data from Hugging Face: {e}") return None def calculate_kde_from_dataframe(df): try: if 'latitude' not in df.columns or 'longitude' not in df.columns: return None, None, None, "Error: DataFrame must contain 'latitude' and 'longitude' columns." df.dropna(subset=['latitude', 'longitude'], inplace=True) latitudes = df['latitude'].values longitudes = df['longitude'].values coordinates = np.vstack([longitudes, latitudes]) kde_object = gaussian_kde(coordinates) return latitudes, longitudes, kde_object, None except Exception as e: return None, None, None, f"Error calculating KDE from DataFrame: {e}" import math def find_hotspot_landmark(original_latitudes, original_longitudes, kde_object): """ Finds the hotspot and identifies the closest landmark from a predefined custom list of campus locations. """ # 1. Create your own dictionary of important campus landmarks CAMPUS_LANDMARKS = { "Scaife Hall": (40.441742986804336, -79.94725195600002), "Hunt Library": (40.44097574857165, -79.94362666281333), "Cohon University Center": (40.44401378993309, -79.94172335009584), "Gates Hillman Complex": (40.4436463605335, -79.94442701667683), "Wean Hall": (40.44267896399903, -79.94582169457243), "Gesling Stadium": (40.443038206822905, -79.94038027450188), "The Fence": (40.44221744932438, -79.9435687098247) } # 2. Find the coordinates of the densest point (same as before) all_coords = np.vstack([original_longitudes, original_latitudes]) densities = kde_object(all_coords) hotspot_index = np.argmax(densities) hotspot_lat = original_latitudes[hotspot_index] hotspot_lon = original_longitudes[hotspot_index] # 3. Function to calculate the distance between two coordinates def distance(lat1, lon1, lat2, lon2): # A simple Euclidean distance is good enough for a small area like a campus return math.sqrt((lat1 - lat2)**2 + (lon1 - lon2)**2) # 4. Find the landmark from your list with the smallest distance to the hotspot closest_landmark = min( CAMPUS_LANDMARKS.keys(), key=lambda landmark: distance(hotspot_lat, hotspot_lon, CAMPUS_LANDMARKS[landmark][0], CAMPUS_LANDMARKS[landmark][1]) ) return f"📈 **Hotspot Analysis**: The highest concentration was found closest to **{closest_landmark}** on campus." def plot_kde_and_points_for_gradio(min_lat, max_lat, min_lon, max_lon, original_latitudes, original_longitudes, kde_object): heatmap_path = "lanternfly_kde_heatmap.png" x, y = np.mgrid[min_lon:max_lon:100j, min_lat:max_lat:100j] positions = np.vstack([x.ravel(), y.ravel()]) z = kde_object(positions).reshape(x.shape) z_normalized = (z - z.min()) / (z.max() - z.min()) if z.max() > z.min() else np.zeros_like(z) fig, ax = plt.subplots(figsize=(8, 8)) im = ax.imshow(z_normalized.T, origin='lower', extent=[min_lon, max_lon, min_lat, max_lat], cmap='hot', aspect='auto') fig.colorbar(im, ax=ax, label='Normalized Density (0-1)') ax.set_title('Lanternfly Sightings KDE Heatmap (Static)') plt.savefig(heatmap_path, bbox_inches='tight') plt.close(fig) m_colored_points = folium.Map() bounds = [[min_lat, min_lon], [max_lat, max_lon]] m_colored_points.fit_bounds(bounds) original_coordinates = np.vstack([original_longitudes, original_latitudes]) density_at_points = kde_object(original_coordinates) density_normalized_for_color = (density_at_points - density_at_points.min()) / (density_at_points.max() - density_at_points.min() + 1e-9) max_density = density_at_points.max() colormap = cm.get_cmap('viridis') for lat, lon, density_norm_color in zip(original_latitudes, original_longitudes, density_normalized_for_color): if min_lat <= lat <= max_lat and min_lon <= lon <= max_lon: color = matplotlib.colors.rgb2hex(colormap(density_norm_color)) raw_density = kde_object([lon, lat])[0] normalized_tooltip_density = raw_density / max_density if max_density > 0 else 0 folium.CircleMarker( location=[lat, lon], radius=5, color=color, fill=True, fill_color=color, fill_opacity=0.7, tooltip=f"Normalized Density: {normalized_tooltip_density:.4f}" ).add_to(m_colored_points) return heatmap_path, m_colored_points._repr_html_() import joblib # Make sure this is imported at the top def load_kde_from_hub(): """ Downloads the pre-trained KDE model from the Hugging Face Hub and loads it. """ try: print("Downloading pre-trained KDE model...") model_path = huggingface_hub.hf_hub_download( repo_id="ddecosmo/lanternfly-kde-model", # Use the same repo_id from the upload script filename="kde_model.joblib", repo_type="model" ) kde_model = joblib.load(model_path) print("✅ Pre-trained KDE model loaded.") return kde_model except Exception as e: print(f"❌ Failed to load KDE model from Hub: {e}") return None def run_full_analysis_and_update_ui(): """ This function is now much faster. It loads the pre-trained KDE and all the raw data points for visualization. """ # --- Load both the pre-trained model and the raw data --- kde_object = load_kde_from_hub() lanternfly_df = load_dataframe_from_huggingface() if kde_object is None or lanternfly_df is None: return gr.Image(visible=False), gr.HTML("