test / app.py
gmmcleod's picture
Create app.py
46d89e3 verified
# change the eval ftn to take a list of lists
import gradio as gr
import pandas as pd
import time
import torch
import os
import torchvision.transforms as transforms
from torchvision import datasets
import torch.nn.functional as F
from torch.utils.data import DataLoader
import subprocess
# from dummy_eval import foo
import zipfile
import shutil
import numpy as np
import importlib.util
import inspect
from huggingface_hub import HfApi
from datasets import load_dataset, Dataset
from huggingface_hub import login, hf_hub_download
import requests
import matplotlib
matplotlib.use("Agg")
def fetch_required_files(exp_config):
# os.makedirs("temp_data", exist_ok=True)
for key in exp_config:
file_path = exp_config[key]['file']
url = f"https://saraht14-server.hf.space/file/{file_path}.txt"
filename_only = os.path.basename(file_path) + ".txt"
local_path = os.path.join("./", filename_only)
downloaded = download_file(url, local_path)
if not downloaded:
raise Exception(f"Could not download file: {file_path}")
exp_config[key]["local_file"] = local_path
return exp_config
def call_flask_server(username):
url = "https://saraht14-server.hf.space/"
try:
response = requests.get(url)
result = response.json()
print("Flask response:", result)
return result.get("result", "No result")
except Exception as e:
print("Failed to contact Flask server:", e)
return f"Error contacting server: {e}"
call_flask_server("sarah")
def download_file(url, local_path):
try:
r = requests.get(url, headers={"Authorization": f"Bearer {HF_TOKEN}"})
r.raise_for_status()
with open(local_path, 'wb') as f:
f.write(r.content)
return local_path
except Exception as e:
print(f"Error downloading file from {url}: {e}")
return None
# def log_submission_request(username, zip_file):
# try:
# requests_ds = load_dataset("IndoorOutdoor/requests", split="train")
# except Exception as e:
# print("Could not load requests dataset, creating a new one.", e)
# requests_ds = Dataset.from_dict({"username": [], "timestamp": [], "zip_filename": []})
# new_entry = {"username": username,
# "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
# "zip_filename": os.path.basename(zip_file.name)}
# updated_requests = requests_ds.add_item(new_entry)
# updated_requests.push_to_hub("IndoorOutdoor/requests", token=HF_TOKEN)
# print("Logged submission request to the requests dataset.")
# def update_results_dataset(leaderboard_df):
repo_id = "saraht14/responses"
def update_results_dataset(new_row_df):
repo_id = "saraht14/responses"
try:
leaderboard_dataset = load_dataset(repo_id, split="train", token=HF_TOKEN)
leaderboard_df = leaderboard_dataset.to_pandas()
updated_df = pd.concat([leaderboard_df, new_row_df], ignore_index=True)
updated_dataset = Dataset.from_pandas(updated_df)
updated_dataset.push_to_hub(repo_id, token=HF_TOKEN)
print("New row(s) added to existing leaderboard dataset.")
return updated_dataset
except Exception as e:
print("Dataset not found or failed to load, creating a new one.")
try:
new_dataset = Dataset.from_pandas(new_row_df)
new_dataset.push_to_hub(repo_id, token=HF_TOKEN)
return new_dataset
print("New leaderboard dataset created and uploaded.")
except Exception as inner_e:
print("Failed to create and push new leaderboard dataset:", inner_e)
# Info to change for your repository
# ----------------------------------
HF_TOKEN = os.environ.get("HF_TOKEN") # A read/write token for your org
print(f"{HF_TOKEN}")
OWNER = "IndoorOutdoor" # Change to your org - don't forget to create a results and request dataset, with the correct format!
# ----------------------------------
READ_TOKEN = os.environ.get("read_token")
local_file_path = hf_hub_download(repo_id="IndoorOutdoor/metadata",
filename="ali/home/office-gain-50-10-25-2023-16-16-03-dump1090.txt",
repo_type="dataset",
token=READ_TOKEN)
REPO_ID = f"{OWNER}/leaderboard"
QUEUE_REPO = f"{OWNER}/requests"
RESULTS_REPO = f"{OWNER}/results"
global_error_message = "Ready for submission!"
# def set_error_message(message):
# global global_error_message
# global_error_message = message
# print("ERROR UPDATED:", global_error_message) # Debugging
def get_error_message():
return global_error_message
def install_requirements(file_path):
try:
with open(file_path, "r") as file:
requirements = file.readlines()
for req in requirements:
package = req.strip()
if package:
subprocess.run(["pip", "install", package], check=True)
print(f"Installed: {package}")
print("All requirements installed successfully.")
except FileNotFoundError:
print(f"Error: {file_path} not found.")
except subprocess.CalledProcessError as e:
print(f"Installation failed: {e}")
HEADERS = ["Username", "Execution Time (s)", "Accuracy", "TP", "FP", "FN", "TN", "Status"]
BASE = {'ottawa':(45.30326753851309,-75.93640391349997),
'ali_home':(37.88560412289598,-122.30218612514359),
'josh_home':(37.8697406, -122.30218612514359),
'cory':(37.8697406,-122.281570)}
def get_base(filename):
if "home" in filename:
return BASE["ali_home"]
elif "ottawa" in filename:
return BASE["ottawa"]
elif "josh" in filename:
return BASE["josh_home"]
else:
return BASE["cory"]
metadata_path = "metadata.csv"
dir = ""
df = pd.read_csv(metadata_path)
print(df.head())
def fetch_lb():
try:
leaderboard_dataset = load_dataset("saraht14/responses", split="train", token=HF_TOKEN)
leaderboard_data = leaderboard_dataset.to_pandas()
leaderboard_data = leaderboard_data[HEADERS] # keep it ordered
leaderboard_data = leaderboard_data.sort_values(by=["Accuracy", "Execution Time (s)"], ascending=[False, True])
except Exception as e:
print(f"Error loading leaderboard:", e)
leaderboard_data = pd.DataFrame(columns=HEADERS)
print(f"THIS IS THE LEADERBOARD:\n{leaderboard_data}")
return leaderboard_data
leaderboard_data = fetch_lb()
def compute_stats_sector(sectors_model, sector_gt):
TP = FP = FN = TN = 0
ignored = 0
for i in range(len(sector_gt)):
if sector_gt[i] == 1:
if sectors_model[i] > 0 or sectors_model[(i+1) % 8] > 0 or sectors_model[(i-1) % 8] > 0 :
TP += 1
else:
FN += 1
else:
if sectors_model[i] > 0:
if sector_gt[(i-1) % 8] > 0 or sector_gt[(i+1) % 8] > 0:
TP += 1
continue
FP += 1
else:
TN += 1
NUM_SECTORS = 8 - ignored
return [TP / NUM_SECTORS, FP / NUM_SECTORS, FN / NUM_SECTORS, TN / NUM_SECTORS]
#Compare the model output with ground truth
#return TP, FP, FN, TN
#This fuction compute stats when the model is binary i.e., outputs only indoor vs outdoor
def compute_stats_in_out(sectors_model, indoor_gt):
if indoor_gt: #if groundtruth is indoor
for i in range(len(sectors_model)):
if sectors_model[i]:
return [0,1,0,0]
return [0,0,0,1]
else: #if outdoor
for i in range(len(sectors_model)):
if sectors_model[i]:
return [1,0,0,0]
return [0,0,1,0]
def read_configuration(filename):
print("read config")
with open(filename, 'r') as file:
data = file.read().split('\n')
data = data[1:] #ignore the header
print("head", data)
exp = {}
for line in data:
if len(line) == 0:
continue
tokens =line.split(',')
file = tokens[0]
scenario = tokens[1]
indoor = True if tokens[2] == "TRUE" else 0
exp[scenario] = {'sectors':[1 if x == "TRUE" else 0 for x in tokens[3:]], 'indoor':indoor, "file":file}
return exp
def evaluate_model(username, file):
print("evaluating...")
global leaderboard_data
username = username.strip()
if not username:
return leaderboard_data.values.tolist()
script_path = f"submissions/{username}.py"
os.makedirs("submissions", exist_ok=True)
# # Get the file path from the NamedString object
# file_path = file.name # Get the actual file path
# print("file_path:", file_path)
# with open(script_path, "wb") as f:
# with open(file_path, "rb") as uploaded_file:
# f.write(uploaded_file.read())
# script_path = f"submissions/{username}.py"
# os.makedirs("submissions", exist_ok=True)
# with open(script_path, "wb") as f:
# f.write(file.read())
try:
exp = read_configuration("metadata.csv")
print(f"FIRST: {len(exp)}")
# exp = fetch_required_files(exp)
# print(f"SECOND: {len(exp)}")
start_time = time.time()
stats_model_sectors = []
stats_model_in_out = []
for key in exp:
filename = exp[key]['file']
indoor_gt = exp[key]['indoor']
sectors_gt = exp[key]["sectors"]
# file_path = os.path.join(dataset_directory, filename)
# print(file_path)
filename = filename + ".txt"
print("FILE TO PROCESS:", filename)
# filename_url = f"https://saraht14-server.hf.space/file/{filename}"
# local_txt_path = f"./{filename}.txt"
# os.makedirs("temp_data", exist_ok=True)
# local_file_path = exp[key]["local_file"]
# downloaded = download_file(filename_url, local_txt_path)
local_file_path = hf_hub_download(repo_id="IndoorOutdoor/metadata",
filename=filename,
repo_type="dataset",
token=READ_TOKEN)
# if not downloaded:
# raise Exception("Failed to fetch remote file.")
# sectors_model = subprocess.run(["python", script_path,filename], capture_output=True, text=True, timeout=300)
# hello = foo()
# print(f"HELLO: {hello}")
# import
sectors_model = import_and_run_function(file, "evaluate", local_file_path)
try:
os.remove(local_file_path)
except Exception as e:
print(f"Warning: Couldn't delete {local_file_path}{e}")
# print(status)
print(f"TYPE: {type(sectors_model), {type(sectors_model[0])}}")
print("SECTORS MODEL: ", sectors_model)
# sectors_model = eval(filename)
# print(sectors_model)
# sectors_model = model_based_clustering(dataset_directory, filename)
stats_model_sectors.append(compute_stats_sector(sectors_model, sectors_gt))
stats_model_in_out.append(compute_stats_in_out(sectors_model, indoor_gt))
execution_time = round(time.time() - start_time, 4)
print("calculating summary stats")
TP = np.mean([x[0] for x in stats_model_sectors])
FP = np.mean([x[1] for x in stats_model_sectors])
FN = np.mean([x[2] for x in stats_model_sectors])
TN = np.mean([x[3] for x in stats_model_sectors])
print("calculating exec stats")
accuracy = round((TP + TN) / (TP + TN + FP + FN), 2)
status = "Success" if accuracy > 0 else "Incorrect Model"
# ["Username", "Execution Time (s)", "Accuracy", "True Positive", "False Positive", "False Negative", "False Positive", "Status"]
except Exception as e:
leaderboard_data = pd.concat([leaderboard_data, pd.DataFrame([[username, float("inf"), 0,-1,-1,-1,-1, f"Model Error: {str(e)}"]],
columns=HEADERS)], ignore_index=True)
return leaderboard_data.values.tolist()
print("calculating new entry")
new_entry = pd.DataFrame([[username, execution_time, accuracy, TP, FP, FN, TN, status]],
columns=HEADERS)
print("updating new entry")
leaderboard_data = update_results_dataset(new_entry)
# leaderboard_data = pd.concat([leaderboard_data, new_entry], ignore_index=True)
leaderboard_data = leaderboard_data.to_pandas() if leaderboard_data is not None else None
if leaderboard_data is not None:
leaderboard_data = leaderboard_data.sort_values(by=["Accuracy", "Execution Time (s)"], ascending=[False, True]).reset_index(drop=True)
print(f"DATA: {leaderboard_data}")
return leaderboard_data.values.tolist()
def import_and_run_function(script_path, function_name, filename):
if not os.path.exists(script_path):
set_error_message(f"Error: {script_path} not found.")
return None
if not script_path.endswith(".py"):
set_error_message("Error: Provided file is not a Python script.")
return None
module_name = os.path.splitext(os.path.basename(script_path))[0]
try:
spec = importlib.util.spec_from_file_location(module_name, script_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
except SyntaxError as e:
set_error_message(f"Error: Syntax error in the script - {e}")
return None
except ImportError as e:
set_error_message(f"Error: Import issue in the script - {e}")
return None
except Exception as e:
set_error_message(f"Error: Failed to import script - {e}")
return None
if not hasattr(module, function_name):
set_error_message(f"Error: Function '{function_name}' not found in '{script_path}'.")
return None
function_to_run = getattr(module, function_name)
try:
sig = inspect.signature(function_to_run)
params = list(sig.parameters.values())
if len(params) != 1 or params[0].kind not in [inspect.Parameter.POSITIONAL_OR_KEYWORD]:
set_error_message(f"Error: Function '{function_name}' must have exactly one parameter (filepath).")
return None
except Exception as e:
set_error_message(f"Error: Unable to inspect function signature - {e}")
return None
try:
result = function_to_run(filename)
print(f"TYPE: {type(result), {type(result[0])}}, RESULT: {result}")
except Exception as e:
set_error_message(f"Error: Function '{function_name}' raised an error during execution - {e}")
return None
if not isinstance(result, list):
set_error_message(f"Error: Function '{function_name}' must return a list.")
return None
if len(result) != 8:
set_error_message(f"Error: Function '{function_name}' must return a list of exactly 8 elements.")
return None
if not all(isinstance(x, int) and x in [0, 1] for x in result):
return f"Error: Function '{function_name}' must return a list of 8 integers, each 0 or 1.", None
print(f"Function '{function_name}' executed successfully. Output: {result}")
# set_error_message(f"Function '{function_name}' executed successfully.")
return result
def update_leaderboard(username, zip_file):
if not zip_file:
set_error_message("No file uploaded.")
return get_error_message(), None
zip_path = zip_file.name
extract_path = os.path.join("", username)
# if not os.path.exists(extract_path):
# os.makedirs(extract_path)
try:
if not os.path.exists(extract_path):
os.makedirs(extract_path)
except OSError:
set_error_message("Error creating directory for extraction.")
return get_error_message(), None
try:
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(extract_path)
except zipfile.BadZipFile:
return "Invalid ZIP file.", None
except Exception as e:
return f"Error extracting ZIP file: {str(e)}", None
extracted_files = os.listdir(extract_path)
print("EXTRACTED FILES:", extracted_files)
req_file = os.path.join(extract_path, "user_reqs.txt")
if "user_reqs.txt" not in extracted_files:
return "Missing user_reqs.txt in ZIP file.", None
try:
install_requirements(req_file)
except Exception as e:
return f"Error installing dependencies: {str(e)}", None
# for file in os.listdir(extract_path):
# if file.endswith(".py"):
# python_script = os.path.join(extract_path, file)
# break
python_script = os.path.join(extract_path, "main.py")
if "main.py" not in extracted_files:
return "No Python script (main.py) found in ZIP.", None
# if not python_script:
# return "No Python script found in ZIP."
if "main.py" not in extracted_files:
return "No Python script (main.py) found in ZIP.", None
try:
updated_leaderboard = evaluate_model(username, python_script)
except Exception as e:
print("Error in eval mode:", str(e))
return f"Error evaluating model: {str(e)}", None
# log_submission_request(username, zip_file)
return "Submission successful!", updated_leaderboard
with gr.Blocks() as demo:
gr.Markdown("# 🚀 Model Submission & Leaderboard (Hugging Face Spaces)")
with gr.Row():
username_input = gr.Textbox(label="Username")
file_input = gr.File(label="Upload Zip File")
submit_button = gr.Button("Submit File")
status_output = gr.Textbox(label="Status", interactive=False)
with gr.Row():
leaderboard_display = gr.Dataframe(
headers=HEADERS,
value=fetch_lb,
label="Leaderboard"
)
submit_button.click(fn=update_leaderboard,
inputs=[username_input, file_input],
outputs=[status_output, leaderboard_display])
status_output.change(fn=get_error_message, inputs=[], outputs=status_output)
demo.launch()