# -*- coding: utf-8 -*- """Untitled2.ipynb Automatically generated by Colab. Original file is located at https://colab.research.google.com/drive/1UcsSFSmZqIdAQTsD_4_CmwwAcAzz0h60 """ import numpy as np import pandas as pd import os import io import matplotlib.pyplot as plt import matplotlib.cm as cm import folium import matplotlib.colors from scipy.stats import gaussian_kde from PIL import Image import gradio as gr import huggingface_hub from huggingface_hub import HfApi, hf_hub_download, create_repo, file_exists, upload_file import tempfile import pathlib import json import uuid import shutil import zipfile from datetime import datetime # Import MultiModalPredictor for the model loading logic # NOTE: This import assumes 'autogluon.multimodal' is installed in the environment. try: from autogluon.multimodal import MultiModalPredictor AUTOGLUON_IMPORTED = True except ImportError: # Set flag to False if the complex dependency is missing AUTOGLUON_IMPORTED = False class MultiModalPredictor: @staticmethod def load(path): raise ImportError("AutoGluon MultiModalPredictor is not installed or failed to import.") # --- 1. CLASSIFICATION CONFIGURATION & MODEL LOADING --- MODEL_REPO_ID = "ddecosmo/lanternfly_classifier" ZIP_FILENAME = "autogluon_image_predictor_dir.zip" MODEL_DIR_NAME = "autogluon_predictor_extracted" CLASSIFICATION_LABELS = ["Lanternfly", "Other Insect", "Neither"] PREDICTOR = None MODEL_STATUS = "Attempting to load model..." # Robust download and extraction of the AutoGluon model zip file def _prepare_predictor_dir(repo_id, zip_filename, extract_dir_name) -> str: """Downloads the zipped model and extracts it to a clean directory.""" base_extract_dir = os.path.join(os.getcwd(), extract_dir_name) try: # 1. Download the zipped model file from Hugging Face Hub zip_path = hf_hub_download(repo_id=repo_id, filename=zip_filename) # 2. Prepare directories if os.path.exists(base_extract_dir): shutil.rmtree(base_extract_dir) temp_extract_dir = os.path.join(os.getcwd(), "temp_ag_extract") os.makedirs(temp_extract_dir, exist_ok=True) # 3. Extract contents with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(temp_extract_dir) # 4. Handle nested directory structure (common with zip creation) extracted_contents = os.listdir(temp_extract_dir) if len(extracted_contents) == 1 and os.path.isdir(os.path.join(temp_extract_dir, extracted_contents[0])): final_model_dir = os.path.join(temp_extract_dir, extracted_contents[0]) shutil.move(final_model_dir, base_extract_dir) shutil.rmtree(temp_extract_dir) else: os.rename(temp_extract_dir, base_extract_dir) return base_extract_dir except Exception as e: print(f"Error during model prep: {e}") return "" # Global initialization on startup if AUTOGLUON_IMPORTED: try: predictor_dir = _prepare_predictor_dir(MODEL_REPO_ID, ZIP_FILENAME, MODEL_DIR_NAME) if predictor_dir: PREDICTOR = MultiModalPredictor.load(predictor_dir) MODEL_STATUS = f"✅ Model Active: {MODEL_REPO_ID}" else: MODEL_STATUS = "❌ Initialization failed during extraction/download." except Exception as e: PREDICTOR = None MODEL_STATUS = f"❌ Error loading model: {type(e).__name__} (Load Fail)" else: MODEL_STATUS = "❌ AutoGluon not imported. Classification tab is disabled." # Core Lanternfly classification function def classify_image(img: Image.Image): """Predicts the class of the input image using the loaded AutoGluon model.""" if PREDICTOR is None: return "MODEL FAILED TO LOAD", 0.0, 0.0, 0.0 if img is None: return "NO IMAGE PROVIDED", 0.0, 0.0, 0.0 final_output = [0.0] * len(CLASSIFICATION_LABELS) final_result = "PREDICTION FAILED" # Save image to a temporary path for AutoGluon to read temp_dir = pathlib.Path(tempfile.mkdtemp()) img_path = temp_dir / "input.png" img.save(img_path) try: df_path = pd.DataFrame({"image": [str(img_path)]}) proba_df = PREDICTOR.predict_proba(df_path, as_pandas=True) scores_dict = proba_df.iloc[0].to_dict() # Map scores to the expected order of CLASSIFICATION_LABELS scores = [float(scores_dict.get(label, 0.0)) for label in CLASSIFICATION_LABELS] predicted_class_label = max(scores_dict, key=scores_dict.get) final_output = scores final_result = f"Predicted Class: **{predicted_class_label}**" except Exception as e: final_result = f"CRITICAL PREDICTION FAILURE: {type(e).__name__} - Check AutoGluon dependencies." finally: shutil.rmtree(temp_dir) return final_result, final_output[0], final_output[1], final_output[2] # --- 2. GPS CAPTURE & SAVE CONFIGURATION & FUNCTIONS --- HF_TOKEN = os.getenv("HF_TOKEN") or os.getenv("HF_TOKEN_SPACE") DATASET_REPO = os.getenv("DATASET_REPO", "rlogh/lanternfly-data") METADATA_PATH = "metadata/entries.jsonl" api = None if HF_TOKEN and DATASET_REPO: api = HfApi(token=HF_TOKEN) try: # Ensure the dataset repository exists create_repo(DATASET_REPO, repo_type="dataset", exist_ok=True, token=HF_TOKEN) GPS_SAVE_STATUS = "✅ Dataset saving enabled." except Exception as e: GPS_SAVE_STATUS = f"⚠️ Error creating dataset repo: {e}" api = None else: GPS_SAVE_STATUS = "⚠️ Running in test mode - no HF credentials (dataset saving disabled)." def get_gps_js(): """JavaScript function to be injected into Gradio to capture GPS coordinates.""" return """ () => { // Look for the hidden textbox element by its ID const textarea = document.querySelector('#hidden_gps_input textarea'); if (!textarea) return; if (!navigator.geolocation) { textarea.value = JSON.stringify({error: "Geolocation not supported by this browser/device."}); textarea.dispatchEvent(new Event('input', { bubbles: true })); return; } // Request current position navigator.geolocation.getCurrentPosition( function(position) { const data = { latitude: position.coords.latitude, longitude: position.coords.longitude, accuracy: position.coords.accuracy, timestamp: position.timestamp }; // Write JSON string to the hidden textbox and trigger a change event textarea.value = JSON.stringify(data); textarea.dispatchEvent(new Event('input', { bubbles: true })); }, function(err) { textarea.value = JSON.stringify({ error: err.message }); textarea.dispatchEvent(new Event('input', { bubbles: true })); }, { enableHighAccuracy: true, timeout: 10000 } ); } """ def handle_gps_location(json_str): """Parses the GPS JSON string and updates the Gradio text boxes.""" try: data = json.loads(json_str) if 'error' in data: status_msg = f"❌ **GPS Error**: {data['error']}" return status_msg, "", "", "", "" lat = str(data.get('latitude', '')) lon = str(data.get('longitude', '')) accuracy = str(data.get('accuracy', '')) timestamp_ms = data.get('timestamp') # Convert timestamp (milliseconds since epoch) to ISO string device_ts = "" if timestamp_ms and isinstance(timestamp_ms, (int, float)): device_ts = datetime.fromtimestamp(timestamp_ms / 1000).isoformat() status_msg = f"✅ **GPS Captured**: {lat[:8]}, {lon[:8]} (accuracy: {accuracy}m)" return status_msg, lat, lon, accuracy, device_ts except Exception as e: status_msg = f"❌ **Error parsing GPS data**: {str(e)}" return status_msg, "", "", "", "" def _save_image_to_repo(pil_img: Image.Image, dest_rel_path: str) -> None: """Uploads a PIL image into the dataset repo via a memory buffer.""" img_bytes = io.BytesIO() pil_img.save(img_bytes, format="JPEG", quality=90) img_bytes.seek(0) upload_file( path_or_fileobj=img_bytes, path_in_repo=dest_rel_path, repo_id=DATASET_REPO, repo_type="dataset", token=HF_TOKEN, commit_message=f"Upload image {dest_rel_path}", ) def _append_jsonl_in_repo(new_row: dict) -> None: """Appends a new JSON line to the metadata file in the dataset repo.""" buf = io.BytesIO() existing_lines = [] try: # 1. Download existing metadata file if it exists if file_exists(DATASET_REPO, METADATA_PATH, repo_type="dataset", token=HF_TOKEN): local_path = hf_hub_download( repo_id=DATASET_REPO, filename=METADATA_PATH, repo_type="dataset", token=HF_TOKEN ) with open(local_path, "r", encoding="utf-8") as f: existing_lines = f.read().splitlines() except Exception: # Ignore download failure if the file doesn't exist yet pass # 2. Append the new line existing_lines.append(json.dumps(new_row, ensure_ascii=False)) data = "\n".join(existing_lines).encode("utf-8") buf.write(data); buf.seek(0) # 3. Upload the updated file upload_file( path_or_fileobj=buf, path_in_repo=METADATA_PATH, repo_id=DATASET_REPO, repo_type="dataset", token=HF_TOKEN, commit_message=f"Append 1 entry at {datetime.now().isoformat()}Z", ) def save_to_dataset(image, lat, lon, accuracy_m, device_ts): """Validates data and saves the image and metadata to the Hugging Face dataset.""" try: if image is None: return "❌ **Error**: No image captured.", "" if not lat or not lon: return "❌ **Error**: GPS coordinates missing.", "" # Convert image to PIL if it's a numpy array (common in Gradio) if isinstance(image, np.ndarray): image = Image.fromarray(image.astype('uint8')) # --- Test Mode --- if not api: img_id = str(uuid.uuid4()) timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S") row = {"id": img_id, "image": f"test_{timestamp_str}_{img_id[:8]}.jpg", "latitude": float(lat), "longitude": float(lon), "mode": "test"} status = f"🔍 **Test Mode**: Data validated successfully! Sample {img_id[:8]}" preview = json.dumps(row, indent=2) return status, preview # --- Production Mode --- sample_id = str(uuid.uuid4()) timestamp_str = datetime.now().strftime("%Y%m%d_%H%M%S") image_rel_path = f"images/lanternfly_{timestamp_str}_{sample_id[:8]}.jpg" # 1. Save image _save_image_to_repo(image, image_rel_path) server_ts_utc = datetime.now().isoformat() + "Z" # 2. Prepare and save metadata row = { "id": sample_id, "image": image_rel_path, "latitude": float(lat), "longitude": float(lon), "accuracy_m": float(accuracy_m) if accuracy_m else None, "device_timestamp": device_ts if device_ts else None, "server_timestamp_utc": server_ts_utc, } _append_jsonl_in_repo(row) status = f"✅ **Success!** Saved to dataset! Image: `{image_rel_path}`" preview = json.dumps(row, indent=2) return status, preview except Exception as e: error_msg = f"❌ **Error during save**: {str(e)}" return error_msg, "" # --- 3. KDE CONFIGURATION & FUNCTIONS (UPDATED FOR LIVE DATA) --- HUGGINGFACE_DATA_REPO = "rlogh/lanternfly-data" METADATA_PATH = "metadata/entries.jsonl" # Define the Pittsburgh coordinate range (used for visualization extent) pittsburgh_lat_min, pittsburgh_lat_max = 40.3, 40.6 pittsburgh_lon_min, pittsburgh_lon_max = -80.2, -79.8 def load_lanternfly_data_from_hf(): """Downloads the JSONL metadata file from HF and extracts latitude/longitude.""" try: # Download the file local_path = hf_hub_download( repo_id=HUGGINGFACE_DATA_REPO, filename=METADATA_PATH, repo_type="dataset" ) latitudes = [] longitudes = [] # Parse the JSONL file with open(local_path, 'r', encoding='utf-8') as f: for line in f: try: data = json.loads(line) lat = data.get('latitude') lon = data.get('longitude') if isinstance(lat, (float, int)) and isinstance(lon, (float, int)): # Filter points to be within the Pittsburgh area for relevance if pittsburgh_lat_min <= lat <= pittsburgh_lat_max and \ pittsburgh_lon_min <= lon <= pittsburgh_lon_max: latitudes.append(lat) longitudes.append(lon) except json.JSONDecodeError: continue # Skip malformed lines if not latitudes: return None, None, "Error: Found no valid coordinates in the dataset." return np.array(latitudes), np.array(longitudes), None except Exception as e: return None, None, f"Error downloading or parsing HF data: {type(e).__name__} - {e}" def calculate_kde_and_points(): """Loads data, calculates KDE, and prepares data for visualization.""" latitudes, longitudes, error = load_lanternfly_data_from_hf() if error: return None, None, None, error try: # Combine coordinates into a 2D array for KDE coordinates = np.vstack([longitudes, latitudes]) # Compute the kernel density estimate kde_object = gaussian_kde(coordinates) return latitudes, longitudes, kde_object, None except Exception as e: return None, None, None, f"Error calculating KDE: {type(e).__name__} - {e}" def plot_kde_and_points(min_lat, max_lat, min_lon, max_lon, original_latitudes, original_longitudes, kde_object): """Generates an interactive Folium map with points colored by KDE density.""" # --- Folium Interactive Map with Colored Points --- # 1. Calculate density at each original point original_coordinates = np.vstack([original_longitudes, original_latitudes]) density_at_original_points = kde_object(original_coordinates) # Normalize density for coloring density_normalized = (density_at_original_points - density_at_original_points.min()) / (density_at_original_points.max() - density_at_original_points.min() + 1e-9) # 2. Setup map colormap = cm.get_cmap('viridis') map_center_lat = np.mean(original_latitudes) map_center_lon = np.mean(original_longitudes) m_colored_points = folium.Map(location=[map_center_lat, map_center_lon], zoom_start=12) # 3. Add points to map for lat, lon, density_norm in zip(original_latitudes, original_longitudes, density_normalized): color = matplotlib.colors.rgb2hex(colormap(density_norm)) folium.CircleMarker( location=[lat, lon], radius=5, color=color, fill=True, fill_color=color, fill_opacity=0.7, tooltip=f"Lat: {lat:.5f}, Lon: {lon:.5f}" ).add_to(m_colored_points) colored_points_map_html = m_colored_points._repr_html_() # The original plot_kde_and_points also returned a Matplotlib image, but the Gradio tab was updated to remove it. # We return None for the image output to match the function signature expected by Gradio. return None, colored_points_map_html def update_visualization_live(): """Main visualization function for the Gradio interface.""" latitudes, longitudes, kde_object, error = calculate_kde_and_points() if error: # Return blank outputs and the error message return None, f"