# change the eval ftn to take a list of lists import gradio as gr import pandas as pd import time import torch import os import torchvision.transforms as transforms from torchvision import datasets import torch.nn.functional as F from torch.utils.data import DataLoader import subprocess # from dummy_eval import foo import zipfile import shutil import numpy as np import importlib.util import inspect from huggingface_hub import HfApi from datasets import load_dataset, Dataset from huggingface_hub import login, hf_hub_download import requests import matplotlib matplotlib.use("Agg") def fetch_required_files(exp_config): # os.makedirs("temp_data", exist_ok=True) for key in exp_config: file_path = exp_config[key]['file'] url = f"https://saraht14-server.hf.space/file/{file_path}.txt" filename_only = os.path.basename(file_path) + ".txt" local_path = os.path.join("./", filename_only) downloaded = download_file(url, local_path) if not downloaded: raise Exception(f"Could not download file: {file_path}") exp_config[key]["local_file"] = local_path return exp_config def call_flask_server(username): url = "https://saraht14-server.hf.space/" try: response = requests.get(url) result = response.json() print("Flask response:", result) return result.get("result", "No result") except Exception as e: print("Failed to contact Flask server:", e) return f"Error contacting server: {e}" call_flask_server("sarah") def download_file(url, local_path): try: r = requests.get(url, headers={"Authorization": f"Bearer {HF_TOKEN}"}) r.raise_for_status() with open(local_path, 'wb') as f: f.write(r.content) return local_path except Exception as e: print(f"Error downloading file from {url}: {e}") return None # def log_submission_request(username, zip_file): # try: # requests_ds = load_dataset("IndoorOutdoor/requests", split="train") # except Exception as e: # print("Could not load requests dataset, creating a new one.", e) # requests_ds = Dataset.from_dict({"username": [], "timestamp": [], "zip_filename": []}) # new_entry = {"username": username, # "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), # "zip_filename": os.path.basename(zip_file.name)} # updated_requests = requests_ds.add_item(new_entry) # updated_requests.push_to_hub("IndoorOutdoor/requests", token=HF_TOKEN) # print("Logged submission request to the requests dataset.") # def update_results_dataset(leaderboard_df): repo_id = "saraht14/responses" def update_results_dataset(new_row_df): repo_id = "saraht14/responses" try: leaderboard_dataset = load_dataset(repo_id, split="train", token=HF_TOKEN) leaderboard_df = leaderboard_dataset.to_pandas() updated_df = pd.concat([leaderboard_df, new_row_df], ignore_index=True) updated_dataset = Dataset.from_pandas(updated_df) updated_dataset.push_to_hub(repo_id, token=HF_TOKEN) print("New row(s) added to existing leaderboard dataset.") return updated_dataset except Exception as e: print("Dataset not found or failed to load, creating a new one.") try: new_dataset = Dataset.from_pandas(new_row_df) new_dataset.push_to_hub(repo_id, token=HF_TOKEN) return new_dataset print("New leaderboard dataset created and uploaded.") except Exception as inner_e: print("Failed to create and push new leaderboard dataset:", inner_e) # Info to change for your repository # ---------------------------------- HF_TOKEN = os.environ.get("HF_TOKEN") # A read/write token for your org print(f"{HF_TOKEN}") OWNER = "IndoorOutdoor" # Change to your org - don't forget to create a results and request dataset, with the correct format! # ---------------------------------- READ_TOKEN = os.environ.get("read_token") local_file_path = hf_hub_download(repo_id="IndoorOutdoor/metadata", filename="ali/home/office-gain-50-10-25-2023-16-16-03-dump1090.txt", repo_type="dataset", token=READ_TOKEN) REPO_ID = f"{OWNER}/leaderboard" QUEUE_REPO = f"{OWNER}/requests" RESULTS_REPO = f"{OWNER}/results" global_error_message = "Ready for submission!" # def set_error_message(message): # global global_error_message # global_error_message = message # print("ERROR UPDATED:", global_error_message) # Debugging def get_error_message(): return global_error_message def install_requirements(file_path): try: with open(file_path, "r") as file: requirements = file.readlines() for req in requirements: package = req.strip() if package: subprocess.run(["pip", "install", package], check=True) print(f"Installed: {package}") print("All requirements installed successfully.") except FileNotFoundError: print(f"Error: {file_path} not found.") except subprocess.CalledProcessError as e: print(f"Installation failed: {e}") HEADERS = ["Username", "Execution Time (s)", "Accuracy", "TP", "FP", "FN", "TN", "Status"] BASE = {'ottawa':(45.30326753851309,-75.93640391349997), 'ali_home':(37.88560412289598,-122.30218612514359), 'josh_home':(37.8697406, -122.30218612514359), 'cory':(37.8697406,-122.281570)} def get_base(filename): if "home" in filename: return BASE["ali_home"] elif "ottawa" in filename: return BASE["ottawa"] elif "josh" in filename: return BASE["josh_home"] else: return BASE["cory"] metadata_path = "metadata.csv" dir = "" df = pd.read_csv(metadata_path) print(df.head()) def fetch_lb(): try: leaderboard_dataset = load_dataset("saraht14/responses", split="train", token=HF_TOKEN) leaderboard_data = leaderboard_dataset.to_pandas() leaderboard_data = leaderboard_data[HEADERS] # keep it ordered leaderboard_data = leaderboard_data.sort_values(by=["Accuracy", "Execution Time (s)"], ascending=[False, True]) except Exception as e: print(f"Error loading leaderboard:", e) leaderboard_data = pd.DataFrame(columns=HEADERS) print(f"THIS IS THE LEADERBOARD:\n{leaderboard_data}") return leaderboard_data leaderboard_data = fetch_lb() def compute_stats_sector(sectors_model, sector_gt): TP = FP = FN = TN = 0 ignored = 0 for i in range(len(sector_gt)): if sector_gt[i] == 1: if sectors_model[i] > 0 or sectors_model[(i+1) % 8] > 0 or sectors_model[(i-1) % 8] > 0 : TP += 1 else: FN += 1 else: if sectors_model[i] > 0: if sector_gt[(i-1) % 8] > 0 or sector_gt[(i+1) % 8] > 0: TP += 1 continue FP += 1 else: TN += 1 NUM_SECTORS = 8 - ignored return [TP / NUM_SECTORS, FP / NUM_SECTORS, FN / NUM_SECTORS, TN / NUM_SECTORS] #Compare the model output with ground truth #return TP, FP, FN, TN #This fuction compute stats when the model is binary i.e., outputs only indoor vs outdoor def compute_stats_in_out(sectors_model, indoor_gt): if indoor_gt: #if groundtruth is indoor for i in range(len(sectors_model)): if sectors_model[i]: return [0,1,0,0] return [0,0,0,1] else: #if outdoor for i in range(len(sectors_model)): if sectors_model[i]: return [1,0,0,0] return [0,0,1,0] def read_configuration(filename): print("read config") with open(filename, 'r') as file: data = file.read().split('\n') data = data[1:] #ignore the header print("head", data) exp = {} for line in data: if len(line) == 0: continue tokens =line.split(',') file = tokens[0] scenario = tokens[1] indoor = True if tokens[2] == "TRUE" else 0 exp[scenario] = {'sectors':[1 if x == "TRUE" else 0 for x in tokens[3:]], 'indoor':indoor, "file":file} return exp def evaluate_model(username, file): print("evaluating...") global leaderboard_data username = username.strip() if not username: return leaderboard_data.values.tolist() script_path = f"submissions/{username}.py" os.makedirs("submissions", exist_ok=True) # # Get the file path from the NamedString object # file_path = file.name # Get the actual file path # print("file_path:", file_path) # with open(script_path, "wb") as f: # with open(file_path, "rb") as uploaded_file: # f.write(uploaded_file.read()) # script_path = f"submissions/{username}.py" # os.makedirs("submissions", exist_ok=True) # with open(script_path, "wb") as f: # f.write(file.read()) try: exp = read_configuration("metadata.csv") print(f"FIRST: {len(exp)}") # exp = fetch_required_files(exp) # print(f"SECOND: {len(exp)}") start_time = time.time() stats_model_sectors = [] stats_model_in_out = [] for key in exp: filename = exp[key]['file'] indoor_gt = exp[key]['indoor'] sectors_gt = exp[key]["sectors"] # file_path = os.path.join(dataset_directory, filename) # print(file_path) filename = filename + ".txt" print("FILE TO PROCESS:", filename) # filename_url = f"https://saraht14-server.hf.space/file/{filename}" # local_txt_path = f"./{filename}.txt" # os.makedirs("temp_data", exist_ok=True) # local_file_path = exp[key]["local_file"] # downloaded = download_file(filename_url, local_txt_path) local_file_path = hf_hub_download(repo_id="IndoorOutdoor/metadata", filename=filename, repo_type="dataset", token=READ_TOKEN) # if not downloaded: # raise Exception("Failed to fetch remote file.") # sectors_model = subprocess.run(["python", script_path,filename], capture_output=True, text=True, timeout=300) # hello = foo() # print(f"HELLO: {hello}") # import sectors_model = import_and_run_function(file, "evaluate", local_file_path) try: os.remove(local_file_path) except Exception as e: print(f"Warning: Couldn't delete {local_file_path} — {e}") # print(status) print(f"TYPE: {type(sectors_model), {type(sectors_model[0])}}") print("SECTORS MODEL: ", sectors_model) # sectors_model = eval(filename) # print(sectors_model) # sectors_model = model_based_clustering(dataset_directory, filename) stats_model_sectors.append(compute_stats_sector(sectors_model, sectors_gt)) stats_model_in_out.append(compute_stats_in_out(sectors_model, indoor_gt)) execution_time = round(time.time() - start_time, 4) print("calculating summary stats") TP = np.mean([x[0] for x in stats_model_sectors]) FP = np.mean([x[1] for x in stats_model_sectors]) FN = np.mean([x[2] for x in stats_model_sectors]) TN = np.mean([x[3] for x in stats_model_sectors]) print("calculating exec stats") accuracy = round((TP + TN) / (TP + TN + FP + FN), 2) status = "Success" if accuracy > 0 else "Incorrect Model" # ["Username", "Execution Time (s)", "Accuracy", "True Positive", "False Positive", "False Negative", "False Positive", "Status"] except Exception as e: leaderboard_data = pd.concat([leaderboard_data, pd.DataFrame([[username, float("inf"), 0,-1,-1,-1,-1, f"Model Error: {str(e)}"]], columns=HEADERS)], ignore_index=True) return leaderboard_data.values.tolist() print("calculating new entry") new_entry = pd.DataFrame([[username, execution_time, accuracy, TP, FP, FN, TN, status]], columns=HEADERS) print("updating new entry") leaderboard_data = update_results_dataset(new_entry) # leaderboard_data = pd.concat([leaderboard_data, new_entry], ignore_index=True) leaderboard_data = leaderboard_data.to_pandas() if leaderboard_data is not None else None if leaderboard_data is not None: leaderboard_data = leaderboard_data.sort_values(by=["Accuracy", "Execution Time (s)"], ascending=[False, True]).reset_index(drop=True) print(f"DATA: {leaderboard_data}") return leaderboard_data.values.tolist() def import_and_run_function(script_path, function_name, filename): if not os.path.exists(script_path): set_error_message(f"Error: {script_path} not found.") return None if not script_path.endswith(".py"): set_error_message("Error: Provided file is not a Python script.") return None module_name = os.path.splitext(os.path.basename(script_path))[0] try: spec = importlib.util.spec_from_file_location(module_name, script_path) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) except SyntaxError as e: set_error_message(f"Error: Syntax error in the script - {e}") return None except ImportError as e: set_error_message(f"Error: Import issue in the script - {e}") return None except Exception as e: set_error_message(f"Error: Failed to import script - {e}") return None if not hasattr(module, function_name): set_error_message(f"Error: Function '{function_name}' not found in '{script_path}'.") return None function_to_run = getattr(module, function_name) try: sig = inspect.signature(function_to_run) params = list(sig.parameters.values()) if len(params) != 1 or params[0].kind not in [inspect.Parameter.POSITIONAL_OR_KEYWORD]: set_error_message(f"Error: Function '{function_name}' must have exactly one parameter (filepath).") return None except Exception as e: set_error_message(f"Error: Unable to inspect function signature - {e}") return None try: result = function_to_run(filename) print(f"TYPE: {type(result), {type(result[0])}}, RESULT: {result}") except Exception as e: set_error_message(f"Error: Function '{function_name}' raised an error during execution - {e}") return None if not isinstance(result, list): set_error_message(f"Error: Function '{function_name}' must return a list.") return None if len(result) != 8: set_error_message(f"Error: Function '{function_name}' must return a list of exactly 8 elements.") return None if not all(isinstance(x, int) and x in [0, 1] for x in result): return f"Error: Function '{function_name}' must return a list of 8 integers, each 0 or 1.", None print(f"Function '{function_name}' executed successfully. Output: {result}") # set_error_message(f"Function '{function_name}' executed successfully.") return result def update_leaderboard(username, zip_file): if not zip_file: set_error_message("No file uploaded.") return get_error_message(), None zip_path = zip_file.name extract_path = os.path.join("", username) # if not os.path.exists(extract_path): # os.makedirs(extract_path) try: if not os.path.exists(extract_path): os.makedirs(extract_path) except OSError: set_error_message("Error creating directory for extraction.") return get_error_message(), None try: with zipfile.ZipFile(zip_path, "r") as zip_ref: zip_ref.extractall(extract_path) except zipfile.BadZipFile: return "Invalid ZIP file.", None except Exception as e: return f"Error extracting ZIP file: {str(e)}", None extracted_files = os.listdir(extract_path) print("EXTRACTED FILES:", extracted_files) req_file = os.path.join(extract_path, "user_reqs.txt") if "user_reqs.txt" not in extracted_files: return "Missing user_reqs.txt in ZIP file.", None try: install_requirements(req_file) except Exception as e: return f"Error installing dependencies: {str(e)}", None # for file in os.listdir(extract_path): # if file.endswith(".py"): # python_script = os.path.join(extract_path, file) # break python_script = os.path.join(extract_path, "main.py") if "main.py" not in extracted_files: return "No Python script (main.py) found in ZIP.", None # if not python_script: # return "No Python script found in ZIP." if "main.py" not in extracted_files: return "No Python script (main.py) found in ZIP.", None try: updated_leaderboard = evaluate_model(username, python_script) except Exception as e: print("Error in eval mode:", str(e)) return f"Error evaluating model: {str(e)}", None # log_submission_request(username, zip_file) return "Submission successful!", updated_leaderboard with gr.Blocks() as demo: gr.Markdown("# 🚀 Model Submission & Leaderboard (Hugging Face Spaces)") with gr.Row(): username_input = gr.Textbox(label="Username") file_input = gr.File(label="Upload Zip File") submit_button = gr.Button("Submit File") status_output = gr.Textbox(label="Status", interactive=False) with gr.Row(): leaderboard_display = gr.Dataframe( headers=HEADERS, value=fetch_lb, label="Leaderboard" ) submit_button.click(fn=update_leaderboard, inputs=[username_input, file_input], outputs=[status_output, leaderboard_display]) status_output.change(fn=get_error_message, inputs=[], outputs=status_output) demo.launch()