Spaces:
Runtime error
Runtime error
| # %% | |
| from PIL import Image | |
| import torch | |
| import torchvision | |
| from torchvision import transforms | |
| from torchvision.transforms.functional import InterpolationMode | |
| import json | |
| from collections import defaultdict | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import random | |
| from os import listdir | |
| from os.path import isfile, join | |
| from torchvision.io import read_image | |
| from torchvision.utils import draw_bounding_boxes | |
| from PIL import Image | |
| import os | |
| from scipy.stats import rankdata | |
| import tqdm | |
| import streamlit as st | |
| import pandas as pd | |
| # %% | |
| def load_json(PATH): | |
| if os.path.isfile(PATH) and os.access(PATH, os.R_OK): | |
| with open(PATH) as json_file: | |
| dict_data = json.load(json_file) | |
| else: | |
| print("The Path of", PATH,"is not exist") | |
| dict_data = {} | |
| return dict_data | |
| def get_list_folder(PATH): | |
| return [name for name in os.listdir(PATH) if os.path.isdir(os.path.join(PATH, name))] | |
| def get_file_only(PATH): | |
| return [f for f in os.listdir(PATH) if os.path.isfile(os.path.join(PATH, f))] | |
| # %% | |
| def compute_ndcg(ranks, scores, k=3): | |
| """ | |
| ranks = [5, 1, 4, 2, 3] | |
| scores = [0.1, 0.5, 0.3, 0.95, 1.0] | |
| """ | |
| rank_score_tuple = list(zip(ranks, scores)) | |
| top_k = sorted(rank_score_tuple, key=lambda x: x[1], reverse=True)[:k] | |
| dcg = sum([score / np.log2(rank + 1) for rank, score in top_k]) | |
| ideal_dcg = sum([score / np.log2(idx + 2) for idx, (_, score) in enumerate(top_k)]) | |
| ndcg = dcg / ideal_dcg | |
| return ndcg | |
| def compute_ndcg_score_per_mode(pred_rank_dict, gpt_rel_scores, random_sample_dict, mode='indrect', split='test', k=200): | |
| ndcg_scores = [] | |
| for key in tqdm.tqdm(pred_rank_dict.keys(), total=len(pred_rank_dict.keys())): | |
| gpt_scores_for_key = [gpt_rel_scores[key][cand_key] if cand_key in gpt_rel_scores[key] else 0.0 for cand_key in random_sample_dict[key]] | |
| pred_rank_for_key = pred_rank_dict[key] | |
| ndcg_score = compute_ndcg(pred_rank_for_key, gpt_scores_for_key, k=k) | |
| ndcg_scores.append(ndcg_score) | |
| avg_ndcg_score = sum(ndcg_scores) / len(ndcg_scores) | |
| print(f"Random split, mode={mode} ndcg score: ", avg_ndcg_score) | |
| return avg_ndcg_score | |
| # %% | |
| def get_score_direct(random_sample_pair_test_direct, predictions, key_pair, similarity_score_test_direct, k = 200): | |
| mode = 'direct' | |
| i2t_ranks = [] | |
| t2i_ranks = [] | |
| i2t_rank_dict = {} | |
| results_dict = {} | |
| key_pair_reversed = {v: k for k, v in key_pair.items()} | |
| for file_key in tqdm.tqdm(random_sample_pair_test_direct.keys(), total=len(random_sample_pair_test_direct.keys())): | |
| i2t_rank = rankdata([predictions[str(file_key)+':'+str(key_pair[k])] for k in random_sample_pair_test_direct[file_key]]) | |
| t2i_rank = rankdata([predictions[str(key_pair_reversed[key_pair[k]])+':'+str(key_pair[file_key])] for k in random_sample_pair_test_direct[file_key]]) | |
| i2t_ranks.append(i2t_rank[-1]) | |
| t2i_ranks.append(t2i_rank[-1]) | |
| i2t_rank_dict[file_key] = i2t_rank | |
| assert len(i2t_ranks) == len(t2i_ranks) == 1000 | |
| ndcg_score = compute_ndcg_score_per_mode(i2t_rank_dict, similarity_score_test_direct, random_sample_pair_test_direct, mode='indirect', split='test', k=200) | |
| results_dict['direct'] = {} | |
| results_dict['direct']['i2t rank'] = float(sum(i2t_ranks) / len(i2t_ranks)) | |
| results_dict['direct']['t2i rank'] = float(sum(t2i_ranks) / len(t2i_ranks)) | |
| results_dict['direct']['ndcg score'] = float(ndcg_score) | |
| print(f"Random split, mode={mode} i2t rank: ", sum(i2t_ranks) / len(i2t_ranks)) | |
| print(f"Random split, mode={mode} t2i rank: ", sum(t2i_ranks) / len(t2i_ranks)) | |
| return results_dict | |
| # %% | |
| def get_score_indirect(random_sample_pair_test_indirect, predictions, key_pair, similarity_score_test_indirect, k = 200): | |
| mode = 'indirect' | |
| i2t_ranks = [] | |
| t2i_ranks = [] | |
| i2t_rank_dict = {} | |
| results_dict = {} | |
| key_pair_reversed = {v: k for k, v in key_pair.items()} | |
| for file_key in tqdm.tqdm(random_sample_pair_test_indirect.keys(), total=len(random_sample_pair_test_indirect.keys())): | |
| i2t_rank = rankdata([predictions[str(file_key)+':'+str(key_pair[k])] for k in random_sample_pair_test_indirect[file_key]]) | |
| t2i_rank = rankdata([predictions[str(key_pair_reversed[key_pair[k]])+':'+str(key_pair[file_key])] for k in random_sample_pair_test_indirect[file_key]]) | |
| i2t_ranks.append(i2t_rank[-1]) | |
| t2i_ranks.append(t2i_rank[-1]) | |
| i2t_rank_dict[file_key] = i2t_rank | |
| assert len(i2t_ranks) == len(t2i_ranks) == 1000 | |
| ndcg_score = compute_ndcg_score_per_mode(i2t_rank_dict, similarity_score_test_indirect, random_sample_pair_test_indirect, mode='indrect', split='test', k=200) | |
| results_dict['indirect'] = {} | |
| results_dict['indirect']['i2t rank'] = float(sum(i2t_ranks) / len(i2t_ranks)) | |
| results_dict['indirect']['t2i rank'] = float(sum(t2i_ranks) / len(t2i_ranks)) | |
| results_dict['indirect']['ndcg score'] = float(ndcg_score) | |
| print(f"Random split, mode={mode} i2t rank: ", sum(i2t_ranks) / len(i2t_ranks)) | |
| print(f"Random split, mode={mode} t2i rank: ", sum(t2i_ranks) / len(t2i_ranks)) | |
| return results_dict | |
| # %% | |
| def main(json_file): | |
| ### Setup | |
| # os.environ['ROOT'] = os.path.dirname(os.path.realpath(__file__)) | |
| # %% | |
| ### Load data | |
| # if os.path.isfile(os.path.join(os.environ['ROOT'], json_file)): #'results_pair_dict.json')): | |
| # predictions_file_path = os.path.join(os.environ['ROOT'], json_file) #'results_pair_dict.json') | |
| # else: | |
| # predictions_file_path = os.path.join(os.environ['ROOT'], json_file) #'data/results_pair_dict.json') | |
| # with open(predictions_file_path) as f: | |
| # predictions = json.load(f) | |
| predictions = json_file | |
| with open(os.path.join(os.environ['ROOT'], 'data/key_pair.json')) as f: | |
| key_pair = json.load(f) | |
| # key_pair_reversed = {v: k for k, v in key_pair.items()} | |
| with open(os.path.join(os.environ['ROOT'], 'data/random_sample_test_direct_ids.json')) as f: | |
| random_sample_pair_test_direct = json.load(f) | |
| with open(os.path.join(os.environ['ROOT'], 'data/random_sample_test_indirect_ids.json')) as f: | |
| random_sample_pair_test_indirect = json.load(f) | |
| with open(os.path.join(os.environ['ROOT'], 'data/chatgpt_similarity_score_test_direct.json')) as f: | |
| similarity_score_test_direct = json.load(f) | |
| with open(os.path.join(os.environ['ROOT'], 'data/chatgpt_similarity_score_test_indirect.json')) as f: | |
| similarity_score_test_indirect = json.load(f) | |
| # %% | |
| ### Compute scores | |
| print("computing the score !!") | |
| result_direct = get_score_direct(random_sample_pair_test_direct, predictions, key_pair, similarity_score_test_direct, k = 200) | |
| result_indirect = get_score_indirect(random_sample_pair_test_indirect, predictions, key_pair, similarity_score_test_indirect, k = 200) | |
| result_dict = {**result_direct, **result_indirect} | |
| return result_dict | |
| # %% | |
| if __name__ == '__main__': | |
| os.environ['ROOT'] = os.path.dirname(os.path.realpath(__file__)) | |
| st.title("Evaluation Server for Driving Hazard Prediction and Reasoning ") | |
| st.image(os.path.join(os.environ['ROOT'],'data/preview_image.jpeg')) | |
| st.divider() | |
| result_text = '' | |
| result_dict = {} | |
| uploaded_files = None | |
| json_file = None | |
| uploaded_files = st.file_uploader("Upload All Result Files Here (results_pair_dict1.csv, results_pair_dict2.csv)", type=["csv"], accept_multiple_files=True) | |
| dataframe = pd.DataFrame([]) | |
| if uploaded_files != None: | |
| print("upload file process") | |
| for i in range(len(uploaded_files)): | |
| dataframe = pd.concat([dataframe, pd.read_csv(uploaded_files[i])]) | |
| result = dataframe.to_dict('tight')['data'] | |
| json_file = {} | |
| for i in range(len(result)): | |
| json_file[str(result[i][1])] = float(result[i][2]) | |
| if st.button('Run Evaluation with no upload files (using demo files)'): | |
| json_file1 = load_json(os.path.join(os.environ['ROOT'], 'results_pair_dict1.json')) | |
| json_file2 = load_json(os.path.join(os.environ['ROOT'], 'results_pair_dict2.json')) | |
| json_file = {**json_file1, **json_file2} | |
| print("finished loading json") | |
| if len(json_file) >= 1: | |
| print("running evaluation") | |
| result_dict = main(json_file) | |
| result_text = json.dumps(result_dict) | |
| st.download_button('Download Results', result_text) | |
| st.json(result_dict) | |
| # !streamlit run app.py --server.fileWatcherType none | |
| # if st.button('Load Results File1 from local instead'): | |
| # json_file1_path = os.path.join(os.environ['ROOT'], 'results_pair_dict1.json') | |
| # json_file1 = load_json(json_file1_path) | |
| # st.write(json_file1) | |
| # if uploaded_files is not None: | |
| # with open(uploaded_file1) as jf: | |
| # json_file1 = json.load(jf) | |
| # json_file1 = load_json(uploaded_file1) | |
| # uploaded1 = True | |
| # uploaded_file2 = st.file_uploader("Upload Results File2") | |
| # if st.button('Load Results File2 from local instead'): | |
| # json_file2_path = os.path.join(os.environ['ROOT'], 'results_pair_dict2.json') | |
| # json_file2 = load_json(json_file2_path) | |
| # st.write(json_file2) | |
| # if uploaded_file2 is not None: | |
| # with open(uploaded_file2) as jf: | |
| # json_file2 = json.load(jf) | |
| # uploaded2 = True | |
| # # if uploaded1 and uploaded2: | |
| # json_file = {**json_file1, **json_file2} | |
| # %% | |