Spaces:
Sleeping
Sleeping
| # change the eval ftn to take a list of lists | |
| import gradio as gr | |
| import pandas as pd | |
| import time | |
| import torch | |
| import os | |
| import torchvision.transforms as transforms | |
| from torchvision import datasets | |
| import torch.nn.functional as F | |
| from torch.utils.data import DataLoader | |
| import subprocess | |
| # from dummy_eval import foo | |
| import zipfile | |
| import shutil | |
| import numpy as np | |
| import importlib.util | |
| import inspect | |
| from huggingface_hub import HfApi | |
| from datasets import load_dataset, Dataset | |
| from huggingface_hub import login, hf_hub_download | |
| import requests | |
| import matplotlib | |
| matplotlib.use("Agg") | |
| # The variable "username" actually refers to "model name" | |
| # def fetch_required_files(exp_config): | |
| # # os.makedirs("temp_data", exist_ok=True) | |
| # for key in exp_config: | |
| # file_path = exp_config[key]['file'] | |
| # url = f"https://saraht14-server.hf.space/file/{file_path}.txt" | |
| # filename_only = os.path.basename(file_path) + ".txt" | |
| # local_path = os.path.join("./", filename_only) | |
| # downloaded = download_file(url, local_path) | |
| # if not downloaded: | |
| # raise Exception(f"Could not download file: {file_path}") | |
| # exp_config[key]["local_file"] = local_path | |
| # return exp_config | |
| def download_file(url, local_path): | |
| try: | |
| r = requests.get(url, headers={"Authorization": f"Bearer {HF_TOKEN}"}) | |
| r.raise_for_status() | |
| with open(local_path, 'wb') as f: | |
| f.write(r.content) | |
| return local_path | |
| except Exception as e: | |
| print(f"Error downloading file from {url}: {e}") | |
| return None | |
| # def log_submission_request(username, zip_file): | |
| # try: | |
| # requests_ds = load_dataset("IndoorOutdoor/requests", split="train") | |
| # except Exception as e: | |
| # print("Could not load requests dataset, creating a new one.", e) | |
| # requests_ds = Dataset.from_dict({"username": [], "timestamp": [], "zip_filename": []}) | |
| # new_entry = {"username": username, | |
| # "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), | |
| # "zip_filename": os.path.basename(zip_file.name)} | |
| # updated_requests = requests_ds.add_item(new_entry) | |
| # updated_requests.push_to_hub("IndoorOutdoor/requests", token=HF_TOKEN) | |
| # print("Logged submission request to the requests dataset.") | |
| # def update_results_dataset(leaderboard_df): | |
| # repo_id = "saraht14/responses" | |
| def update_results_dataset(new_row_df): | |
| # repo_id = "saraht14/responses" | |
| try: | |
| leaderboard_dataset = load_dataset(RESULTS_REPO, split="train", token=HF_TOKEN) | |
| leaderboard_df = leaderboard_dataset.to_pandas() | |
| updated_df = pd.concat([leaderboard_df, new_row_df], ignore_index=True) | |
| updated_dataset = Dataset.from_pandas(updated_df) | |
| updated_dataset.push_to_hub(RESULTS_REPO, token=HF_TOKEN) | |
| print("New row(s) added to existing leaderboard dataset.") | |
| return updated_dataset | |
| except Exception as e: | |
| print("Dataset not found or failed to load, creating a new one.") | |
| try: | |
| new_dataset = Dataset.from_pandas(new_row_df) | |
| new_dataset.push_to_hub(RESULTS_REPO, token=HF_TOKEN) | |
| return new_dataset | |
| print("New leaderboard dataset created and uploaded.") | |
| except Exception as inner_e: | |
| print("Failed to create and push new leaderboard dataset:", inner_e) | |
| # Info to change for your repository | |
| # ---------------------------------- | |
| HF_TOKEN = os.environ.get("ALL_TOKEN") # A read/write token for your org | |
| print(f"{HF_TOKEN}") | |
| OWNER = "IndoorOutdoor" # Change to your org - don't forget to create a results and request dataset, with the correct format! | |
| # ---------------------------------- | |
| READ_TOKEN = os.environ.get("read_token") | |
| local_file_path = hf_hub_download(repo_id="IndoorOutdoor/metadata", | |
| filename="ali/home/office-gain-50-10-25-2023-16-16-03-dump1090.txt", | |
| repo_type="dataset", | |
| token=HF_TOKEN) | |
| REPO_ID = f"{OWNER}/leaderboard" | |
| QUEUE_REPO = f"{OWNER}/requests" | |
| RESULTS_REPO = f"{OWNER}/results" | |
| global_error_message = "Ready for submission!" | |
| def set_error_message(message): | |
| global global_error_message | |
| global_error_message = message | |
| print("ERROR UPDATED:", global_error_message) # Debugging | |
| def get_error_message(): | |
| return global_error_message | |
| def install_requirements(file_path): | |
| try: | |
| with open(file_path, "r") as file: | |
| requirements = file.readlines() | |
| for req in requirements: | |
| package = req.strip() | |
| if package: | |
| subprocess.run(["pip", "install", package], check=True) | |
| print(f"Installed: {package}") | |
| print("All requirements installed successfully.") | |
| except FileNotFoundError: | |
| print(f"Error: {file_path} not found.") | |
| except subprocess.CalledProcessError as e: | |
| print(f"Installation failed: {e}") | |
| HEADERS = ["Model Name", "Group Name", "Execution Time (s)", "Accuracy", "TP", "FP", "FN", "TN"] | |
| BASE = {'ottawa':(45.30326753851309,-75.93640391349997), | |
| 'ali_home':(37.88560412289598,-122.30218612514359), | |
| 'josh_home':(37.8697406, -122.30218612514359), | |
| 'cory':(37.8697406,-122.281570)} | |
| def get_base(filename): | |
| if "home" in filename: | |
| return BASE["ali_home"] | |
| elif "ottawa" in filename: | |
| return BASE["ottawa"] | |
| elif "josh" in filename: | |
| return BASE["josh_home"] | |
| else: | |
| return BASE["cory"] | |
| metadata_path = "metadata.csv" | |
| dir = "" | |
| df = pd.read_csv(metadata_path) | |
| print(df.head()) | |
| def fetch_lb(): | |
| try: | |
| leaderboard_dataset = load_dataset(RESULTS_REPO, split="train", token=HF_TOKEN) | |
| leaderboard_data = leaderboard_dataset.to_pandas() | |
| leaderboard_data = leaderboard_data[HEADERS] # keep it ordered | |
| leaderboard_data = leaderboard_data.sort_values(by=["Accuracy", "Execution Time (s)"], ascending=[False, True]) | |
| except Exception as e: | |
| print(f"Error loading leaderboard:", e) | |
| leaderboard_data = pd.DataFrame(columns=HEADERS) | |
| print(f"THIS IS THE LEADERBOARD:\n{leaderboard_data}") | |
| return leaderboard_data | |
| leaderboard_data = fetch_lb() | |
| def compute_stats_sector(sectors_model, sector_gt): | |
| TP = FP = FN = TN = 0 | |
| ignored = 0 | |
| for i in range(len(sector_gt)): | |
| if sector_gt[i] == 1: | |
| if sectors_model[i] > 0 or sectors_model[(i+1) % 8] > 0 or sectors_model[(i-1) % 8] > 0 : | |
| TP += 1 | |
| else: | |
| FN += 1 | |
| else: | |
| if sectors_model[i] > 0: | |
| if sector_gt[(i-1) % 8] > 0 or sector_gt[(i+1) % 8] > 0: | |
| TP += 1 | |
| continue | |
| FP += 1 | |
| else: | |
| TN += 1 | |
| NUM_SECTORS = 8 - ignored | |
| return [TP / NUM_SECTORS, FP / NUM_SECTORS, FN / NUM_SECTORS, TN / NUM_SECTORS] | |
| #Compare the model output with ground truth | |
| #return TP, FP, FN, TN | |
| #This fuction compute stats when the model is binary i.e., outputs only indoor vs outdoor | |
| def compute_stats_in_out(sectors_model, indoor_gt): | |
| if indoor_gt: #if groundtruth is indoor | |
| for i in range(len(sectors_model)): | |
| if sectors_model[i]: | |
| return [0,1,0,0] | |
| return [0,0,0,1] | |
| else: #if outdoor | |
| for i in range(len(sectors_model)): | |
| if sectors_model[i]: | |
| return [1,0,0,0] | |
| return [0,0,1,0] | |
| def read_configuration(filename): | |
| print("read config") | |
| with open(filename, 'r') as file: | |
| data = file.read().split('\n') | |
| data = data[1:] #ignore the header | |
| print("head", data) | |
| exp = {} | |
| for line in data: | |
| if len(line) == 0: | |
| continue | |
| tokens =line.split(',') | |
| file = tokens[0] | |
| scenario = tokens[1] | |
| indoor = True if tokens[2] == "TRUE" else 0 | |
| exp[scenario] = {'sectors':[1 if x == "TRUE" else 0 for x in tokens[3:]], 'indoor':indoor, "file":file} | |
| return exp | |
| def evaluate_model(username, groupname, file): | |
| print("evaluating...") | |
| global leaderboard_data | |
| username = username.strip() | |
| if not username: | |
| return leaderboard_data.values.tolist() | |
| script_path = f"submissions/{username}.py" | |
| os.makedirs("submissions", exist_ok=True) | |
| try: | |
| exp = read_configuration("metadata.csv") | |
| print(f"FIRST: {len(exp)}") | |
| # exp = fetch_required_files(exp) | |
| # print(f"SECOND: {len(exp)}") | |
| start_time = time.time() | |
| stats_model_sectors = [] | |
| stats_model_in_out = [] | |
| for key in exp: | |
| filename = exp[key]['file'] | |
| indoor_gt = exp[key]['indoor'] | |
| sectors_gt = exp[key]["sectors"] | |
| # file_path = os.path.join(dataset_directory, filename) | |
| # print(file_path) | |
| filename = filename + ".txt" | |
| print("FILE TO PROCESS:", filename) | |
| local_file_path = hf_hub_download(repo_id="IndoorOutdoor/metadata", | |
| filename=filename, | |
| repo_type="dataset", | |
| token=HF_TOKEN) | |
| sectors_model = import_and_run_function(file, "evaluate", local_file_path) | |
| if sectors_model == None: | |
| return None | |
| try: | |
| os.remove(local_file_path) | |
| except Exception as e: | |
| print(f"Warning: Couldn't delete {local_file_path} — {e}") | |
| # print(status) | |
| print(f"TYPE: {type(sectors_model), {type(sectors_model[0])}}") | |
| print("SECTORS MODEL: ", sectors_model) | |
| stats_model_sectors.append(compute_stats_sector(sectors_model, sectors_gt)) | |
| stats_model_in_out.append(compute_stats_in_out(sectors_model, indoor_gt)) | |
| execution_time = round(time.time() - start_time, 4) | |
| print("calculating summary stats") | |
| TP = np.mean([x[0] for x in stats_model_sectors]) | |
| FP = np.mean([x[1] for x in stats_model_sectors]) | |
| FN = np.mean([x[2] for x in stats_model_sectors]) | |
| TN = np.mean([x[3] for x in stats_model_sectors]) | |
| print("calculating exec stats") | |
| accuracy = round((TP + TN) / (TP + TN + FP + FN), 2) | |
| status = "Success" if accuracy > 0 else "Incorrect Model" | |
| # ["Username", "Execution Time (s)", "Accuracy", "True Positive", "False Positive", "False Negative", "False Positive"] | |
| except Exception as e: | |
| leaderboard_data = pd.concat([leaderboard_data, pd.DataFrame([[username, groupname, float("inf"), 0,-1,-1,-1,-1]], | |
| columns=HEADERS)], ignore_index=True) | |
| return leaderboard_data.values.tolist() | |
| print("calculating new entry") | |
| new_entry = pd.DataFrame([[username, groupname, round(execution_time,2), round(accuracy,2), round(TP,2), round(FP,2), round(FN,2), round(TN,2)]], | |
| columns=HEADERS) | |
| print("updating new entry") | |
| leaderboard_data = update_results_dataset(new_entry) | |
| # leaderboard_data = pd.concat([leaderboard_data, new_entry], ignore_index=True) | |
| leaderboard_data = leaderboard_data.to_pandas() if leaderboard_data is not None else None | |
| if leaderboard_data is not None: | |
| leaderboard_data = leaderboard_data.sort_values(by=["Accuracy", "Execution Time (s)"], ascending=[False, True]).reset_index(drop=True) | |
| print(f"DATA: {leaderboard_data}") | |
| return leaderboard_data.values.tolist() | |
| def import_and_run_function(script_path, function_name, filename): | |
| if not os.path.exists(script_path): | |
| set_error_message("Error: Script not found.") | |
| return None | |
| if not script_path.endswith(".py"): | |
| set_error_message("Error: Provided file is not a Python script.") | |
| return None | |
| module_name = os.path.splitext(os.path.basename(script_path))[0] | |
| try: | |
| spec = importlib.util.spec_from_file_location(module_name, script_path) | |
| module = importlib.util.module_from_spec(spec) | |
| spec.loader.exec_module(module) | |
| except SyntaxError as e: | |
| set_error_message("Error: Syntax error in the script.") | |
| return None | |
| except ImportError as e: | |
| set_error_message("Error: Import issue in the script.") | |
| return None | |
| except Exception as e: | |
| set_error_message("Error: Failed to import script.") | |
| return None | |
| if not hasattr(module, function_name): | |
| set_error_message("Error: Function (evaluate) not found in script.") | |
| return None | |
| function_to_run = getattr(module, function_name) | |
| try: | |
| sig = inspect.signature(function_to_run) | |
| print("FUNCTION TO RUN") | |
| print(function_to_run) | |
| params = list(sig.parameters.values()) | |
| if len(params) != 1 or params[0].kind not in [inspect.Parameter.POSITIONAL_OR_KEYWORD]: | |
| set_error_message("Error: Function must have exactly one parameter (filepath).") | |
| return None | |
| except Exception as e: | |
| set_error_message("Error: Unable to inspect function signature.") | |
| return None | |
| try: | |
| result = function_to_run(filename) | |
| print(f"TYPE: {type(result), {type(result[0])}}, RESULT: {result}") | |
| except Exception as e: | |
| set_error_message("Error: Function raised an error during execution.") | |
| return None | |
| if not isinstance(result, list): | |
| set_error_message("Error: Function must return a list.") | |
| return None | |
| if len(result) != 8: | |
| set_error_message("Error: Function must return a list of exactly 8 elements.") | |
| return None | |
| if not all(isinstance(x, int) and x in [0, 1] for x in result): | |
| set_error_message("Function must return a list of 8 integers, each 0 or 1.") | |
| return f"Error: Function '{function_name}' must return a list of 8 integers, each 0 or 1.", None | |
| print(f"Function '{function_name}' executed successfully. Output: {result}") | |
| # set_error_message(f"Function '{function_name}' executed successfully.") | |
| return result | |
| def update_leaderboard(username, groupname, zip_file): | |
| if not zip_file: | |
| set_error_message("No file uploaded.") | |
| return get_error_message(), None | |
| zip_path = zip_file.name | |
| extract_path = os.path.join("", username) | |
| # if not os.path.exists(extract_path): | |
| # os.makedirs(extract_path) | |
| try: | |
| if not os.path.exists(extract_path): | |
| os.makedirs(extract_path) | |
| except OSError: | |
| set_error_message("Error creating directory for extraction.") | |
| return get_error_message(), None | |
| try: | |
| with zipfile.ZipFile(zip_path, "r") as zip_ref: | |
| zip_ref.extractall(extract_path) | |
| except zipfile.BadZipFile: | |
| set_error_message("Invalid ZIP file.") | |
| return "Invalid ZIP file.", None | |
| except Exception as e: | |
| set_error_message("Error extracting ZIP file.") | |
| return f"Error extracting ZIP file: {str(e)}", None | |
| extracted_files = os.listdir(extract_path) | |
| print("EXTRACTED FILES:", extracted_files) | |
| req_file = os.path.join(extract_path, "user_reqs.txt") | |
| if "user_reqs.txt" not in extracted_files: | |
| set_error_message("Missing user_reqs.txt in ZIP file.") | |
| return "Missing user_reqs.txt in ZIP file.", None | |
| try: | |
| install_requirements(req_file) | |
| except Exception as e: | |
| set_error_message("Error installing dependencies.") | |
| return f"Error installing dependencies: {str(e)}", None | |
| # for file in os.listdir(extract_path): | |
| # if file.endswith(".py"): | |
| # python_script = os.path.join(extract_path, file) | |
| # break | |
| python_script = os.path.join(extract_path, "main.py") | |
| if "main.py" not in extracted_files: | |
| set_error_message("No Python script (main.py) found in ZIP.") | |
| return "No Python script (main.py) found in ZIP.", None | |
| # if not python_script: | |
| # return "No Python script found in ZIP." | |
| if "main.py" not in extracted_files: | |
| set_error_message("No Python script (main.py) found in ZIP.") | |
| return "No Python script (main.py) found in ZIP.", None | |
| try: | |
| updated_leaderboard = evaluate_model(username, groupname, python_script) | |
| if updated_leaderboard == None: | |
| return "An error occured while evaluting the model", None | |
| except Exception as e: | |
| print("Error in eval mode:", str(e)) | |
| return f"Error evaluating model: {str(e)}", None | |
| # log_submission_request(username, zip_file) | |
| set_error_message("Submission successful!") | |
| leaderboard_data = fetch_lb() | |
| #print("fetch_lb") | |
| #print(leaderboard_data) | |
| #print("updated leaderboard") | |
| #print(updated_leaderboard) | |
| return "Submission successful!", updated_leaderboard | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# 🚀 Indoor vs Outdoor Detection Leaderboard \nUsing the provided dataset, submit a model that can predict if a device is inside or outside. \nSee the README for submission details.") | |
| with gr.Row(): | |
| username_input = gr.Textbox(label="Model Name") | |
| groupname_input = gr.Textbox(label = "Group Name") | |
| file_input = gr.File(label="Upload Zip File") | |
| submit_button = gr.Button("Submit File") | |
| status_output = gr.Textbox(label="Status", interactive=False) | |
| with gr.Row(): | |
| leaderboard_display = gr.Dataframe( | |
| headers=HEADERS, | |
| value=fetch_lb, | |
| label="Leaderboard" | |
| ) | |
| submit_button.click(fn=update_leaderboard, | |
| inputs=[username_input, groupname_input, file_input], | |
| outputs=[status_output, leaderboard_display]) | |
| #submit_button.click(fn=fetch_lb, inputs=[], outputs=[]) | |
| status_output.change(fn=get_error_message, inputs=[], outputs=status_output) | |
| demo.launch() |