leaderboard3 / app.py
gmmcleod's picture
added requirements
3dbd479 verified
import gradio as gr
import pandas as pd
import time
import torch
import os
import torchvision.transforms as transforms
from torchvision import datasets
import torch.nn.functional as F
from torch.utils.data import DataLoader
BASE = {'ottawa':(45.30326753851309,-75.93640391349997),
'ali_home':(37.88560412289598,-122.30218612514359),
'josh_home':(37.8697406, -122.30218612514359),
'cory':(37.8697406,-122.281570)}
def get_base(filename):
if "home" in filename:
return BASE["ali_home"]
elif "ottawa" in filename:
return BASE["ottawa"]
elif "josh" in filename:
return BASE["josh_home"]
else:
return BASE["cory"]
from datasets import load_dataset
# Load dataset
dataset = load_dataset("IndoorOutdoor/1090dumpData")
# Get dataset cache directory (where files are stored)
dataset_dir = dataset.cache_files
print("Dataset directory:", dataset_dir)
#for root, dirs, files in os.walk(dataset_dir):
# for file in files:
# print(os.path.join(root, file)) # Print full file paths
def read_dump1090_file(filename):
with open(filename, 'r') as file:
adsb_data = file.read().split('*')
adsb_data = adsb_data[1:]
# Sample data in adsb_data
# 8da60e809910c81790b40c0c3a21;
# CRC: 000000
# RSSI: -27.8 dBFS
# Score: 1800
# Time: 20160.42us
# DF:17 AA:A60E80 CA:5 ME:9910C81790B40C
# Extended Squitter Airborne velocity over ground, subsonic (19/1)
# ICAO Address: A60E80 (Mode S / ADS-B)
# Air/Ground: airborne
# GNSS delta: 275 ft
# Heading: 47
# Speed: 273 kt groundspeed
# Vertical rate: 2816 ft/min GNSS
adsb_dict = {}
for message in adsb_data:
if "ICAO Address" not in message:
continue
icao_id = message.split('ICAO Address: ')[1].split(' (')[0]
time = float(message.split('Time: ')[1].split('us')[0])
rssi = float(message.split('RSSI: ')[1].split(' dBFS')[0])
lat = lon = None
try:
if "CPR latitude:" in message:
lat = float(message.split('CPR latitude:')[1].split('(')[0])
lon = float(message.split('CPR longitude:')[1].split('(')[0])
except:
pass
alt = None
try:
if "Altitude: " in message and "ft barometric" in message:
alt = float(message.split('Altitude:')[1].split('ft')[0])
if "Baro altitude: " in message and "ft" in message:
alt = float(message.split('altitude:')[1].split('ft')[0])
except:
pass
#print("non standard alt")
if icao_id in adsb_dict:
last_rssi = adsb_dict[icao_id][-1][1]
delta_rssi = abs(last_rssi - rssi)
adsb_dict[icao_id].append([time , rssi, delta_rssi, lat, lon, alt])
else:
adsb_dict[icao_id] = [[time , rssi, 0, lat, lon, alt]]
return adsb_dict
#Compare the model output with ground truth
#return TP, FP, FN, TN
#Computes stats based on sectors rathere than just indoor vs outdoor
def compute_stats_sector(sectors_model, sector_gt):
TP = FP = FN = TN = 0
ignored = 0
for i in range(len(sector_gt)):
if sector_gt[i] == 1:
if sectors_model[i] > 0 or sectors_model[(i+1) % 8] > 0 or sectors_model[(i-1) % 8] > 0 :
TP += 1
else:
FN += 1
else:
if sectors_model[i] > 0:
if sector_gt[(i-1) % 8] > 0 or sector_gt[(i+1) % 8] > 0:
TP += 1
continue
FP += 1
else:
TN += 1
NUM_SECTORS = 8 - ignored
return [TP / NUM_SECTORS, FP / NUM_SECTORS, FN / NUM_SECTORS, TN / NUM_SECTORS]
#Compare the model output with ground truth
#return TP, FP, FN, TN
#This fuction compute stats when the model is binary i.e., outputs only indoor vs outdoor
def compute_stats_in_out(sectors_model, indoor_gt):
if indoor_gt: #if groundtruth is indoor
for i in range(len(sectors_model)):
if sectors_model[i]:
return [0,1,0,0]
return [0,0,0,1]
else: #if outdoor
for i in range(len(sectors_model)):
if sectors_model[i]:
return [1,0,0,0]
return [0,0,1,0]
def read_configuration(filename):
with open(filename, 'r') as file:
data = file.read().split('\n')
data = data[1:] #ignore the header
exp = {}
for line in data:
if len(line) == 0:
continue
tokens =line.split(',')
file = tokens[0]
scenario = tokens[1]
indoor = True if tokens[2] == "TRUE" else 0
exp[scenario] = {'sectors':[1 if x == "TRUE" else 0 for x in tokens[3:]], 'indoor':indoor, "file":file}
return exp
# Initialize leaderboard storage
leaderboard_data = pd.DataFrame(columns=["Username", "Execution Time (s)", "Accuracy", "Status"])
# `data` directory for persistent storage
HF_STORAGE_DIR = "data"
if not os.path.exists(HF_STORAGE_DIR):
os.makedirs(HF_STORAGE_DIR)
def evaluate_model(username, file):
global leaderboard_data
username = username.strip()
if not username:
return leaderboard_data.values.tolist()
if isinstance(file, str):
temp_file_path = file
else:
temp_file_path = os.path.join(HF_STORAGE_DIR, f"{username}_model.pt")
with open(temp_file_path, "wb") as temp_file:
temp_file.write(file.read())
try:
exp = read_configuration(dataset["metadata.csv"])
# stats_model_sectors = []
# stats_model_in_out = []
# for key in exp:
# filename = exp[key]['file']
# #Groundtruth for each dataset
# indoor_gt = exp[key]['indoor']
# sectors_gt = exp[key]["sectors"]
# print("Dataset: ", filename)
# print("Indoor:\t", indoor_gt)
# print("Ground Truth sectors:\t", sectors_gt)
# #Do clustering for each sector. This will be used later to figure out sector-based and non-sector-based classification
# sectors_model = # TODO: CALL USER FTN
# print("Estimated sectors:\t", sectors_model)
# stats_model_sectors.append(compute_stats_sector(sectors_model, sectors_gt))
# stats_model_in_out.append(compute_stats_in_out(sectors_model, indoor_gt))
# print("----------------------------")
# TP = np.mean([x[0] for x in stats_model_sectors])
# FP = np.mean([x[1] for x in stats_model_sectors])
# FN = np.mean([x[2] for x in stats_model_sectors])
# TN = np.mean([x[3] for x in stats_model_sectors])
# print(TP, FP, FN, TN)
# TP = np.mean([x[0] for x in stats_model_in_out])
# FP = np.mean([x[1] for x in stats_model_in_out])
# FN = np.mean([x[2] for x in stats_model_in_out])
# TN = np.mean([x[3] for x in stats_model_in_out])
# print(TP, FP, FN, TN)
# end_time = time.time()
# exec_time = end_time - start_time
# print(f"Execution Time: {exec_time} seconds")
except Exception as e:
leaderboard_data = pd.concat([leaderboard_data, pd.DataFrame([[username, float("inf"), 0, f"Model Load Error: {str(e)}"]],
columns=["Username", "Execution Time (s)", "Accuracy", "Status"])], ignore_index=True)
return leaderboard_data.values.tolist()
# Measure execution time
start_time = time.time()
correct = 0
total = 0
# Run inference on test dataset
with torch.no_grad():
for images, labels in test_loader:
outputs = model(images)
_, predicted = torch.max(outputs, 1)
correct += (predicted == labels).sum().item()
total += labels.size(0)
execution_time = round(time.time() - start_time, 4)
accuracy = round(100 * correct / total, 2)
status = "Success" if accuracy > 0 else "Incorrect Model"
# Append to leaderboard
new_entry = pd.DataFrame([[username, execution_time, accuracy, status]],
columns=["Username", "Execution Time (s)", "Accuracy", "Status"])
leaderboard_data = pd.concat([leaderboard_data, new_entry], ignore_index=True)
# Sort leaderboard: prioritize accuracy first, then execution time
leaderboard_data = leaderboard_data.sort_values(by=["Accuracy", "Execution Time (s)"],
ascending=[False, True]).reset_index(drop=True)
return leaderboard_data.values.tolist()
# Create Gradio UI
with gr.Blocks() as demo:
gr.Markdown("# 🚀 Model Submission & Leaderboard (Hugging Face Spaces)")
with gr.Row():
username_input = gr.Textbox(label="Username")
file_input = gr.File(label="Upload PyTorch Model (.pt)")
requirements_input = gr.File(label="Upload requirements.txt")
submit_button = gr.Button("Submit Model")
leaderboard_display = gr.Dataframe(headers=["Username", "Execution Time (s)", "Accuracy", "Status"], label="Leaderboard")
def update_leaderboard(username, file):
updated_leaderboard = evaluate_model(username, file)
return updated_leaderboard
submit_button.click(fn=update_leaderboard,
inputs=[username_input, file_input],
outputs=leaderboard_display)
# Launch Gradio app
demo.launch()