dummy / app.py
gmmcleod's picture
change error statements
3d15ec4 verified
import gradio as gr
import pandas as pd
import time
import torch
import os
import torchvision.transforms as transforms
from torchvision import datasets
import torch.nn.functional as F
from torch.utils.data import DataLoader
import subprocess
# from dummy_eval import foo
import zipfile
import shutil
import numpy as np
import importlib.util
import inspect
from huggingface_hub import HfApi
from datasets import load_dataset, Dataset
from huggingface_hub import login
def log_submission_request(username, zip_file):
try:
requests_ds = load_dataset("IndoorOutdoor/requests", split="train")
except Exception as e:
print("Could not load requests dataset, creating a new one.", e)
requests_ds = Dataset.from_dict({"model name": [], "timestamp": [], "zip_filename": []})
new_entry = {"model name": username,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"zip_filename": os.path.basename(zip_file.name)}
updated_requests = requests_ds.add_item(new_entry)
updated_requests.push_to_hub("IndoorOutdoor/requests", token=HF_TOKEN)
print("Logged submission request to the requests dataset.")
def update_results_dataset(leaderboard_df):
try:
leaderboard_dataset = Dataset.from_pandas(leaderboard_df)
leaderboard_dataset.push_to_hub("IndoorOutdoor/results", token=HF_TOKEN)
print("Leaderboard dataset updated on the Hub.")
except Exception as e:
print("Failed to update leaderboard dataset:", e)
# Info to change for your repository
# ----------------------------------
HF_TOKEN = os.environ.get("HF_TOKEN") # A read/write token for your org
print(f"{HF_TOKEN}")
OWNER = "IndoorOutdoor" # Change to your org - don't forget to create a results and request dataset, with the correct format!
# ----------------------------------
REPO_ID = f"{OWNER}/leaderboard"
QUEUE_REPO = f"{OWNER}/requests"
RESULTS_REPO = f"{OWNER}/results"
global_error_message = "Ready for submission!"
def set_error_message(message):
global global_error_message
global_error_message = message
print("ERROR UPDATED:", global_error_message) # Debugging
def get_error_message():
return global_error_message
def install_requirements(file_path):
try:
with open(file_path, "r") as file:
requirements = file.readlines()
for req in requirements:
package = req.strip()
if package:
subprocess.run(["pip", "install", package], check=True)
print(f"Installed: {package}")
print("All requirements installed successfully.")
except FileNotFoundError:
print(f"Error: {file_path} not found.")
except subprocess.CalledProcessError as e:
print(f"Installation failed: {e}")
HEADERS = ["Model Name", "Execution Time (s)", "Accuracy", "TP", "FP", "FN", "FP"]
BASE = {'ottawa':(45.30326753851309,-75.93640391349997),
'ali_home':(37.88560412289598,-122.30218612514359),
'josh_home':(37.8697406, -122.30218612514359),
'cory':(37.8697406,-122.281570)}
def get_base(filename):
if "home" in filename:
return BASE["ali_home"]
elif "ottawa" in filename:
return BASE["ottawa"]
elif "josh" in filename:
return BASE["josh_home"]
else:
return BASE["cory"]
metadata_path = "metadata.csv"
dir = ""
df = pd.read_csv(metadata_path)
print(df.head())
def compute_stats_sector(sectors_model, sector_gt):
TP = FP = FN = TN = 0
ignored = 0
for i in range(len(sector_gt)):
if sector_gt[i] == 1:
if sectors_model[i] > 0 or sectors_model[(i+1) % 8] > 0 or sectors_model[(i-1) % 8] > 0 :
TP += 1
else:
FN += 1
else:
if sectors_model[i] > 0:
if sector_gt[(i-1) % 8] > 0 or sector_gt[(i+1) % 8] > 0:
TP += 1
continue
FP += 1
else:
TN += 1
NUM_SECTORS = 8 - ignored
return [TP / NUM_SECTORS, FP / NUM_SECTORS, FN / NUM_SECTORS, TN / NUM_SECTORS]
#Compare the model output with ground truth
#return TP, FP, FN, TN
#This fuction compute stats when the model is binary i.e., outputs only indoor vs outdoor
def compute_stats_in_out(sectors_model, indoor_gt):
if indoor_gt: #if groundtruth is indoor
for i in range(len(sectors_model)):
if sectors_model[i]:
return [0,1,0,0]
return [0,0,0,1]
else: #if outdoor
for i in range(len(sectors_model)):
if sectors_model[i]:
return [1,0,0,0]
return [0,0,1,0]
def read_configuration(filename):
print("read config")
with open(filename, 'r') as file:
data = file.read().split('\n')
data = data[1:] #ignore the header
print("head", data)
exp = {}
for line in data:
if len(line) == 0:
continue
tokens =line.split(',')
file = tokens[0]
scenario = tokens[1]
indoor = True if tokens[2] == "TRUE" else 0
exp[scenario] = {'sectors':[1 if x == "TRUE" else 0 for x in tokens[3:]], 'indoor':indoor, "file":file}
return exp
leaderboard_data = pd.DataFrame(columns=HEADERS)
def evaluate_model(username, file):
print("evaluating...")
global leaderboard_data
username = username.strip()
if not username:
return leaderboard_data.values.tolist()
script_path = f"submissions/{username}.py"
os.makedirs("submissions", exist_ok=True)
# # Get the file path from the NamedString object
# file_path = file.name # Get the actual file path
# print("file_path:", file_path)
# with open(script_path, "wb") as f:
# with open(file_path, "rb") as uploaded_file:
# f.write(uploaded_file.read())
# script_path = f"submissions/{username}.py"
# os.makedirs("submissions", exist_ok=True)
# with open(script_path, "wb") as f:
# f.write(file.read())
start_time = time.time()
try:
exp = read_configuration("metadata.csv")
stats_model_sectors = []
stats_model_in_out = []
for key in exp:
filename = exp[key]['file']
indoor_gt = exp[key]['indoor']
sectors_gt = exp[key]["sectors"]
# file_path = os.path.join(dataset_directory, filename)
# print(file_path)
filename = filename + ".txt"
print("FILE TO PROCESS:", filename)
# sectors_model = subprocess.run(["python", script_path,filename], capture_output=True, text=True, timeout=300)
# hello = foo()
# print(f"HELLO: {hello}")
# import
sectors_model = import_and_run_function(file, "evaluate", filename)
# print(status)
print(f"TYPE: {type(sectors_model), {type(sectors_model[0])}}")
print("SECTORS MODEL: ", sectors_model)
# sectors_model = eval(filename)
# print(sectors_model)
# sectors_model = model_based_clustering(dataset_directory, filename)
stats_model_sectors.append(compute_stats_sector(sectors_model, sectors_gt))
stats_model_in_out.append(compute_stats_in_out(sectors_model, indoor_gt))
execution_time = round(time.time() - start_time, 4)
print("calculating summary stats")
TP = np.mean([x[0] for x in stats_model_sectors])
FP = np.mean([x[1] for x in stats_model_sectors])
FN = np.mean([x[2] for x in stats_model_sectors])
TN = np.mean([x[3] for x in stats_model_sectors])
print("calculating exec stats")
accuracy = round((TP + TN) / (TP + TN + FP + FN), 2)
# status = "Success" if accuracy > 0 else "Incorrect Model"
# ["Username", "Execution Time (s)", "Accuracy", "True Positive", "False Positive", "False Negative", "False Positive", "Status"]
except Exception as e:
leaderboard_data = pd.concat([leaderboard_data, pd.DataFrame([[username, float("inf"), 0,-1,-1,-1,-1]],
columns=HEADERS)], ignore_index=True)
return leaderboard_data.values.tolist()
print("calculating new entry")
new_entry = pd.DataFrame([[username, execution_time, accuracy, TP, FP, FN, TN]],
columns=HEADERS)
print("updating new entry")
leaderboard_data = pd.concat([leaderboard_data, new_entry], ignore_index=True)
leaderboard_data = leaderboard_data.sort_values(by=["Accuracy", "Execution Time (s)"],
ascending=[False, True]).reset_index(drop=True)
return leaderboard_data.values.tolist()
def import_and_run_function(script_path, function_name, filename):
if not os.path.exists(script_path):
set_error_message(f"Error: {script_path} not found.")
return get_error_message(), pd.DataFrame([[username, float("inf"), 0,-1,-1,-1,-1]])
if not script_path.endswith(".py"):
set_error_message("Error: Provided file is not a Python script.")
return get_error_message(), pd.DataFrame([[username, float("inf"), 0,-1,-1,-1,-1]])
module_name = os.path.splitext(os.path.basename(script_path))[0]
try:
spec = importlib.util.spec_from_file_location(module_name, script_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
except SyntaxError as e:
set_error_message(f"Error: Syntax error in the script - {e}")
return get_error_message(), pd.DataFrame([[username, float("inf"), 0,-1,-1,-1,-1]])
except ImportError as e:
set_error_message(f"Error: Import issue in the script - {e}")
return get_error_message(), pd.DataFrame([[username, float("inf"), 0,-1,-1,-1,-1]])
except Exception as e:
set_error_message(f"Error: Failed to import script - {e}")
return get_error_message(), pd.DataFrame([[username, float("inf"), 0,-1,-1,-1,-1]])
if not hasattr(module, function_name):
set_error_message(f"Error: Function '{function_name}' not found in '{script_path}'.")
return get_error_message(), pd.DataFrame([[username, float("inf"), 0,-1,-1,-1,-1]])
function_to_run = getattr(module, function_name)
try:
sig = inspect.signature(function_to_run)
params = list(sig.parameters.values())
if len(params) != 1 or params[0].kind not in [inspect.Parameter.POSITIONAL_OR_KEYWORD]:
set_error_message(f"Error: Function '{function_name}' must have exactly one parameter (filepath).")
return get_error_message(), pd.DataFrame([[username, float("inf"), 0,-1,-1,-1,-1]])
except Exception as e:
set_error_message(f"Error: Unable to inspect function signature - {e}")
return get_error_message(), pd.DataFrame([[username, float("inf"), 0,-1,-1,-1,-1]])
try:
result = function_to_run(filename)
print(f"TYPE: {type(result), {type(result[0])}}, RESULT: {result}")
except Exception as e:
set_error_message(f"Error: Function '{function_name}' raised an error during execution - {e}")
return get_error_message(), pd.DataFrame([[username, float("inf"), 0,-1,-1,-1,-1]])
if not isinstance(result, list):
set_error_message(f"Error: Function '{function_name}' must return a list.")
return get_error_message(), pd.DataFrame([[username, float("inf"), 0,-1,-1,-1,-1]])
if len(result) != 8:
set_error_message(f"Error: Function '{function_name}' must return a list of exactly 8 elements.")
return get_error_message(), pd.DataFrame([[username, float("inf"), 0,-1,-1,-1,-1]])
if not all(isinstance(x, int) and x in [0, 1] for x in result):
set_error_message(f"Error: Function '{function_name}' must return a list of 8 integers, each 0 or 1.")
return get_error_message(), pd.DataFrame([[username, float("inf"), 0,-1,-1,-1,-1]])
print(f"Function '{function_name}' executed successfully. Output: {result}")
# set_error_message(f"Function '{function_name}' executed successfully.")
return result
def update_leaderboard(username, zip_file):
if username is None and zip_file is None:
return "Ready for Submision", []
if not zip_file:
set_error_message("No file uploaded.")
return get_error_message(), pd.DataFrame([[username, float("inf"), 0,-1,-1,-1,-1]])
zip_path = zip_file.name
extract_path = os.path.join("", username)
# if not os.path.exists(extract_path):
# os.makedirs(extract_path)
try:
if not os.path.exists(extract_path):
os.makedirs(extract_path)
except OSError:
set_error_message("Error creating directory for extraction.")
return get_error_message(), pd.DataFrame([[username, float("inf"), 0,-1,-1,-1,-1]])
try:
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(extract_path)
except zipfile.BadZipFile:
set_error_message("Invalid ZIP file.")
return get_error_message(), pd.DataFrame([[username, float("inf"), 0,-1,-1,-1,-1]])
except Exception as e:
set_error_message(f"Error extracting ZIP file: {str(e)}")
return get_error_message(), pd.DataFrame([[username, float("inf"), 0,-1,-1,-1,-1]])
extracted_files = os.listdir(extract_path)
print("EXTRACTED FILES:", extracted_files)
req_file = os.path.join(extract_path, "user_reqs.txt")
if "user_reqs.txt" not in extracted_files:
set_error_message("Missing user_reqs.txt in ZIP file.")
return get_error_message(), pd.DataFrame([[username, float("inf"), 0,-1,-1,-1,-1]])
try:
install_requirements(req_file)
except Exception as e:
set_error_message(f"Error installing dependencies: {str(e)}")
return get_error_message(), pd.DataFrame([[username, float("inf"), 0,-1,-1,-1,-1]])
# for file in os.listdir(extract_path):
# if file.endswith(".py"):
# python_script = os.path.join(extract_path, file)
# break
python_script = os.path.join(extract_path, "main.py")
if "main.py" not in extracted_files:
set_error_message("No Python script (main.py) found in ZIP.")
return get_error_message(), pd.DataFrame([[username, float("inf"), 0,-1,-1,-1,-1]])
# if not python_script:
# return "No Python script found in ZIP."
if "main.py" not in extracted_files:
set_error_message("No Python script (main.py) found in ZIP.")
return get_error_message(), pd.DataFrame([[username, float("inf"), 0,-1,-1,-1,-1]])
try:
updated_leaderboard = evaluate_model(username, python_script)
except Exception as e:
print("Error in eval mode:", str(e))
set_error_message(f"Error evaluating model: {str(e)}")
return get_error_message(), pd.DataFrame([[username, float("inf"), 0,-1,-1,-1,-1]])
# log_submission_request(username, zip_file)
set_error_message("Submission successful!")
return "Submission successful!", updated_leaderboard
with gr.Blocks() as demo:
gr.Markdown("# 🚀 Model Submission & Leaderboard (Hugging Face Spaces)")
with gr.Row():
username_input = gr.Textbox(label="Model Name")
file_input = gr.File(label="Upload Zip File")
submit_button = gr.Button("Submit File")
status_output = gr.Textbox(label="Status", interactive=False)
with gr.Row():
leaderboard_display = gr.Dataframe(headers=HEADERS, label="Leaderboard")
submit_button.click(fn=update_leaderboard,
inputs=[username_input, file_input],
outputs=[status_output, leaderboard_display])
status_output.change(fn=get_error_message, inputs=[], outputs=status_output)
demo.launch()