Spaces:
Build error
Build error
| import gradio as gr | |
| import pandas as pd | |
| import time | |
| import torch | |
| import os | |
| import torchvision.transforms as transforms | |
| from torchvision import datasets | |
| import torch.nn.functional as F | |
| from torch.utils.data import DataLoader | |
| BASE = {'ottawa':(45.30326753851309,-75.93640391349997), | |
| 'ali_home':(37.88560412289598,-122.30218612514359), | |
| 'josh_home':(37.8697406, -122.30218612514359), | |
| 'cory':(37.8697406,-122.281570)} | |
| def get_base(filename): | |
| if "home" in filename: | |
| return BASE["ali_home"] | |
| elif "ottawa" in filename: | |
| return BASE["ottawa"] | |
| elif "josh" in filename: | |
| return BASE["josh_home"] | |
| else: | |
| return BASE["cory"] | |
| from datasets import load_dataset | |
| # Load dataset | |
| dataset = load_dataset("IndoorOutdoor/1090dumpData") | |
| # Get dataset cache directory (where files are stored) | |
| dataset_dir = dataset.cache_files | |
| print("Dataset directory:", dataset_dir) | |
| #for root, dirs, files in os.walk(dataset_dir): | |
| # for file in files: | |
| # print(os.path.join(root, file)) # Print full file paths | |
| def read_dump1090_file(filename): | |
| with open(filename, 'r') as file: | |
| adsb_data = file.read().split('*') | |
| adsb_data = adsb_data[1:] | |
| # Sample data in adsb_data | |
| # 8da60e809910c81790b40c0c3a21; | |
| # CRC: 000000 | |
| # RSSI: -27.8 dBFS | |
| # Score: 1800 | |
| # Time: 20160.42us | |
| # DF:17 AA:A60E80 CA:5 ME:9910C81790B40C | |
| # Extended Squitter Airborne velocity over ground, subsonic (19/1) | |
| # ICAO Address: A60E80 (Mode S / ADS-B) | |
| # Air/Ground: airborne | |
| # GNSS delta: 275 ft | |
| # Heading: 47 | |
| # Speed: 273 kt groundspeed | |
| # Vertical rate: 2816 ft/min GNSS | |
| adsb_dict = {} | |
| for message in adsb_data: | |
| if "ICAO Address" not in message: | |
| continue | |
| icao_id = message.split('ICAO Address: ')[1].split(' (')[0] | |
| time = float(message.split('Time: ')[1].split('us')[0]) | |
| rssi = float(message.split('RSSI: ')[1].split(' dBFS')[0]) | |
| lat = lon = None | |
| try: | |
| if "CPR latitude:" in message: | |
| lat = float(message.split('CPR latitude:')[1].split('(')[0]) | |
| lon = float(message.split('CPR longitude:')[1].split('(')[0]) | |
| except: | |
| pass | |
| alt = None | |
| try: | |
| if "Altitude: " in message and "ft barometric" in message: | |
| alt = float(message.split('Altitude:')[1].split('ft')[0]) | |
| if "Baro altitude: " in message and "ft" in message: | |
| alt = float(message.split('altitude:')[1].split('ft')[0]) | |
| except: | |
| pass | |
| #print("non standard alt") | |
| if icao_id in adsb_dict: | |
| last_rssi = adsb_dict[icao_id][-1][1] | |
| delta_rssi = abs(last_rssi - rssi) | |
| adsb_dict[icao_id].append([time , rssi, delta_rssi, lat, lon, alt]) | |
| else: | |
| adsb_dict[icao_id] = [[time , rssi, 0, lat, lon, alt]] | |
| return adsb_dict | |
| #Compare the model output with ground truth | |
| #return TP, FP, FN, TN | |
| #Computes stats based on sectors rathere than just indoor vs outdoor | |
| def compute_stats_sector(sectors_model, sector_gt): | |
| TP = FP = FN = TN = 0 | |
| ignored = 0 | |
| for i in range(len(sector_gt)): | |
| if sector_gt[i] == 1: | |
| if sectors_model[i] > 0 or sectors_model[(i+1) % 8] > 0 or sectors_model[(i-1) % 8] > 0 : | |
| TP += 1 | |
| else: | |
| FN += 1 | |
| else: | |
| if sectors_model[i] > 0: | |
| if sector_gt[(i-1) % 8] > 0 or sector_gt[(i+1) % 8] > 0: | |
| TP += 1 | |
| continue | |
| FP += 1 | |
| else: | |
| TN += 1 | |
| NUM_SECTORS = 8 - ignored | |
| return [TP / NUM_SECTORS, FP / NUM_SECTORS, FN / NUM_SECTORS, TN / NUM_SECTORS] | |
| #Compare the model output with ground truth | |
| #return TP, FP, FN, TN | |
| #This fuction compute stats when the model is binary i.e., outputs only indoor vs outdoor | |
| def compute_stats_in_out(sectors_model, indoor_gt): | |
| if indoor_gt: #if groundtruth is indoor | |
| for i in range(len(sectors_model)): | |
| if sectors_model[i]: | |
| return [0,1,0,0] | |
| return [0,0,0,1] | |
| else: #if outdoor | |
| for i in range(len(sectors_model)): | |
| if sectors_model[i]: | |
| return [1,0,0,0] | |
| return [0,0,1,0] | |
| def read_configuration(filename): | |
| with open(filename, 'r') as file: | |
| data = file.read().split('\n') | |
| data = data[1:] #ignore the header | |
| exp = {} | |
| for line in data: | |
| if len(line) == 0: | |
| continue | |
| tokens =line.split(',') | |
| file = tokens[0] | |
| scenario = tokens[1] | |
| indoor = True if tokens[2] == "TRUE" else 0 | |
| exp[scenario] = {'sectors':[1 if x == "TRUE" else 0 for x in tokens[3:]], 'indoor':indoor, "file":file} | |
| return exp | |
| # Initialize leaderboard storage | |
| leaderboard_data = pd.DataFrame(columns=["Username", "Execution Time (s)", "Accuracy", "Status"]) | |
| # `data` directory for persistent storage | |
| HF_STORAGE_DIR = "data" | |
| if not os.path.exists(HF_STORAGE_DIR): | |
| os.makedirs(HF_STORAGE_DIR) | |
| def evaluate_model(username, file): | |
| global leaderboard_data | |
| username = username.strip() | |
| if not username: | |
| return leaderboard_data.values.tolist() | |
| if isinstance(file, str): | |
| temp_file_path = file | |
| else: | |
| temp_file_path = os.path.join(HF_STORAGE_DIR, f"{username}_model.pt") | |
| with open(temp_file_path, "wb") as temp_file: | |
| temp_file.write(file.read()) | |
| try: | |
| exp = read_configuration(dataset["metadata.csv"]) | |
| # stats_model_sectors = [] | |
| # stats_model_in_out = [] | |
| # for key in exp: | |
| # filename = exp[key]['file'] | |
| # #Groundtruth for each dataset | |
| # indoor_gt = exp[key]['indoor'] | |
| # sectors_gt = exp[key]["sectors"] | |
| # print("Dataset: ", filename) | |
| # print("Indoor:\t", indoor_gt) | |
| # print("Ground Truth sectors:\t", sectors_gt) | |
| # #Do clustering for each sector. This will be used later to figure out sector-based and non-sector-based classification | |
| # sectors_model = # TODO: CALL USER FTN | |
| # print("Estimated sectors:\t", sectors_model) | |
| # stats_model_sectors.append(compute_stats_sector(sectors_model, sectors_gt)) | |
| # stats_model_in_out.append(compute_stats_in_out(sectors_model, indoor_gt)) | |
| # print("----------------------------") | |
| # TP = np.mean([x[0] for x in stats_model_sectors]) | |
| # FP = np.mean([x[1] for x in stats_model_sectors]) | |
| # FN = np.mean([x[2] for x in stats_model_sectors]) | |
| # TN = np.mean([x[3] for x in stats_model_sectors]) | |
| # print(TP, FP, FN, TN) | |
| # TP = np.mean([x[0] for x in stats_model_in_out]) | |
| # FP = np.mean([x[1] for x in stats_model_in_out]) | |
| # FN = np.mean([x[2] for x in stats_model_in_out]) | |
| # TN = np.mean([x[3] for x in stats_model_in_out]) | |
| # print(TP, FP, FN, TN) | |
| # end_time = time.time() | |
| # exec_time = end_time - start_time | |
| # print(f"Execution Time: {exec_time} seconds") | |
| except Exception as e: | |
| leaderboard_data = pd.concat([leaderboard_data, pd.DataFrame([[username, float("inf"), 0, f"Model Load Error: {str(e)}"]], | |
| columns=["Username", "Execution Time (s)", "Accuracy", "Status"])], ignore_index=True) | |
| return leaderboard_data.values.tolist() | |
| # Measure execution time | |
| start_time = time.time() | |
| correct = 0 | |
| total = 0 | |
| # Run inference on test dataset | |
| with torch.no_grad(): | |
| for images, labels in test_loader: | |
| outputs = model(images) | |
| _, predicted = torch.max(outputs, 1) | |
| correct += (predicted == labels).sum().item() | |
| total += labels.size(0) | |
| execution_time = round(time.time() - start_time, 4) | |
| accuracy = round(100 * correct / total, 2) | |
| status = "Success" if accuracy > 0 else "Incorrect Model" | |
| # Append to leaderboard | |
| new_entry = pd.DataFrame([[username, execution_time, accuracy, status]], | |
| columns=["Username", "Execution Time (s)", "Accuracy", "Status"]) | |
| leaderboard_data = pd.concat([leaderboard_data, new_entry], ignore_index=True) | |
| # Sort leaderboard: prioritize accuracy first, then execution time | |
| leaderboard_data = leaderboard_data.sort_values(by=["Accuracy", "Execution Time (s)"], | |
| ascending=[False, True]).reset_index(drop=True) | |
| return leaderboard_data.values.tolist() | |
| # Create Gradio UI | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# 🚀 Model Submission & Leaderboard (Hugging Face Spaces)") | |
| with gr.Row(): | |
| username_input = gr.Textbox(label="Username") | |
| file_input = gr.File(label="Upload PyTorch Model (.pt)") | |
| requirements_input = gr.File(label="Upload requirements.txt") | |
| submit_button = gr.Button("Submit Model") | |
| leaderboard_display = gr.Dataframe(headers=["Username", "Execution Time (s)", "Accuracy", "Status"], label="Leaderboard") | |
| def update_leaderboard(username, file): | |
| updated_leaderboard = evaluate_model(username, file) | |
| return updated_leaderboard | |
| submit_button.click(fn=update_leaderboard, | |
| inputs=[username_input, file_input], | |
| outputs=leaderboard_display) | |
| # Launch Gradio app | |
| demo.launch() |