|
|
import datetime
|
|
|
import os
|
|
|
import random
|
|
|
import re
|
|
|
from collections import defaultdict
|
|
|
from pathlib import Path
|
|
|
|
|
|
import numpy as np
|
|
|
import yaml
|
|
|
from loguru import logger as eval_logger
|
|
|
from PIL import Image
|
|
|
|
|
|
from lmms_eval.tasks._task_utils.file_utils import generate_submission_file
|
|
|
|
|
|
dir_name = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
|
|
eval_type_dict = {
|
|
|
"Subfield": [
|
|
|
"Timbre",
|
|
|
"Tone",
|
|
|
"Melody",
|
|
|
"Space",
|
|
|
"Time",
|
|
|
"Hallucination",
|
|
|
"Intricacy",
|
|
|
],
|
|
|
}
|
|
|
|
|
|
with open(Path(__file__).parent / "av_odyssey.yaml", "r") as f:
|
|
|
raw_data = f.readlines()
|
|
|
safe_data = []
|
|
|
for i, line in enumerate(raw_data):
|
|
|
|
|
|
if "!function" not in line:
|
|
|
safe_data.append(line)
|
|
|
|
|
|
config = yaml.safe_load("".join(safe_data))
|
|
|
|
|
|
hf_home = os.getenv("HF_HOME", "~/.cache/huggingface/")
|
|
|
cache_dir = os.path.join(hf_home, config["dataset_kwargs"]["cache_dir"])
|
|
|
|
|
|
question_prompt = "Answer with the option's letter from the given choices directly."
|
|
|
|
|
|
|
|
|
def split_media_tags(content):
|
|
|
pattern = r"\[(audio|video|img)(\d+)\]"
|
|
|
|
|
|
matches = list(re.finditer(pattern, content))
|
|
|
if not matches:
|
|
|
return [content]
|
|
|
|
|
|
result = []
|
|
|
last_end = 0
|
|
|
|
|
|
for match in matches:
|
|
|
if match.start() > last_end:
|
|
|
result.append(content[last_end : match.start()])
|
|
|
|
|
|
media_type = match.group(1)
|
|
|
media_num = int(match.group(2))
|
|
|
result.append((media_type, media_num))
|
|
|
|
|
|
last_end = match.end()
|
|
|
|
|
|
if last_end < len(content):
|
|
|
result.append(content[last_end:])
|
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
def av_odyssey_doc_to_visual(doc):
|
|
|
audio_data = []
|
|
|
image_data = []
|
|
|
video_data = []
|
|
|
result = []
|
|
|
|
|
|
|
|
|
if "image" in doc["data_type"]:
|
|
|
for relative_path in doc["image_path"]:
|
|
|
abs_path = os.path.join(cache_dir, relative_path)
|
|
|
if os.path.exists(abs_path):
|
|
|
image_data.append(abs_path)
|
|
|
else:
|
|
|
print(f"Image path does not exist: {abs_path}")
|
|
|
|
|
|
|
|
|
elif "video" in doc["data_type"]:
|
|
|
for relative_path in doc["video_path"]:
|
|
|
abs_path = os.path.join(cache_dir, relative_path)
|
|
|
if os.path.exists(abs_path):
|
|
|
video_data.append(abs_path)
|
|
|
else:
|
|
|
print(f"Video path does not exist: {abs_path}")
|
|
|
|
|
|
|
|
|
for relative_path in doc["audio_path"]:
|
|
|
abs_path = os.path.join(cache_dir, relative_path)
|
|
|
if os.path.exists(abs_path):
|
|
|
audio_data.append(abs_path)
|
|
|
else:
|
|
|
print(f"Audio path does not exist: {abs_path}")
|
|
|
|
|
|
question = get_text(doc)
|
|
|
for q in question:
|
|
|
if isinstance(q, str):
|
|
|
continue
|
|
|
else:
|
|
|
media_type, media_num = q
|
|
|
media_num = media_num - 1
|
|
|
if media_type == "audio":
|
|
|
result.append(audio_data[media_num])
|
|
|
elif media_type == "video":
|
|
|
result.append(video_data[media_num])
|
|
|
elif media_type == "img":
|
|
|
result.append(image_data[media_num])
|
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
def get_text(doc):
|
|
|
question = doc["question"]
|
|
|
options = doc["options"]
|
|
|
option_text = options[0] + "\n" + options[1] + "\n" + options[2] + "\n" + options[3] + "\n"
|
|
|
text = question + "\n" + option_text + question_prompt
|
|
|
return split_media_tags(text)
|
|
|
|
|
|
|
|
|
def av_odyssey_doc_to_text(doc, lmms_eval_specific_kwargs=None):
|
|
|
text = get_text(doc)
|
|
|
id = 0
|
|
|
result = []
|
|
|
for t in text:
|
|
|
if isinstance(t, str):
|
|
|
result.append(t)
|
|
|
else:
|
|
|
result.append(f"<media_{id}>")
|
|
|
id += 1
|
|
|
return "".join(result)
|
|
|
|
|
|
|
|
|
def parse_multi_choice_response(response, all_choices, index2ans):
|
|
|
"""
|
|
|
Parse the prediction from the generated response.
|
|
|
Return the predicted index e.g., A, B, C, D.
|
|
|
"""
|
|
|
for char in [",", ".", "!", "?", ";", ":", "'"]:
|
|
|
response = response.strip(char)
|
|
|
response = " " + response + " "
|
|
|
|
|
|
index_ans = True
|
|
|
ans_with_brack = False
|
|
|
candidates = []
|
|
|
for choice in all_choices:
|
|
|
if f"{choice}" in response:
|
|
|
candidates.append(choice)
|
|
|
ans_with_brack = True
|
|
|
|
|
|
if len(candidates) == 0:
|
|
|
for choice in all_choices:
|
|
|
if f" {choice} " in response:
|
|
|
candidates.append(choice)
|
|
|
|
|
|
|
|
|
if len(candidates) == 0 and len(response.split()) > 5:
|
|
|
for index, ans in index2ans.items():
|
|
|
if ans.lower() in response.lower():
|
|
|
candidates.append(index)
|
|
|
index_ans = False
|
|
|
|
|
|
if len(candidates) == 0:
|
|
|
|
|
|
pred_index = "A"
|
|
|
elif len(candidates) > 1:
|
|
|
start_indexes = []
|
|
|
if index_ans:
|
|
|
if ans_with_brack:
|
|
|
for can in candidates:
|
|
|
index = response.rfind(f"({can})")
|
|
|
start_indexes.append(index)
|
|
|
|
|
|
else:
|
|
|
for can in candidates:
|
|
|
index = response.rfind(f" {can} ")
|
|
|
start_indexes.append(index)
|
|
|
else:
|
|
|
for can in candidates:
|
|
|
index = response.lower().rfind(index2ans[can].lower())
|
|
|
start_indexes.append(index)
|
|
|
|
|
|
pred_index = candidates[np.argmax(start_indexes)]
|
|
|
else:
|
|
|
pred_index = candidates[0]
|
|
|
|
|
|
return pred_index
|
|
|
|
|
|
|
|
|
def av_odyssey_process_results(doc, results):
|
|
|
"""
|
|
|
Args:
|
|
|
doc: a instance of the eval dataset
|
|
|
results: [pred]
|
|
|
Returns:
|
|
|
a dictionary with key: metric name (in this case av_odyssey score), value: metric value
|
|
|
"""
|
|
|
pred = results[0]
|
|
|
options = doc["options"]
|
|
|
option_list = {"A": options[0][3:], "B": options[1][3:], "C": options[2][3:], "D": options[3][3:]}
|
|
|
answer = parse_multi_choice_response(pred, ["A", "B", "C", "D"], option_list)
|
|
|
gt_answer = doc["answer"]
|
|
|
assert answer in ["A", "B", "C", "D"]
|
|
|
assert gt_answer in ["A", "B", "C", "D"]
|
|
|
score = 1.0 if answer == gt_answer else 0.0
|
|
|
category = doc["subfield"]
|
|
|
key_name = "av_odyssey_score"
|
|
|
|
|
|
|
|
|
return {key_name: {"question_id": doc["question_id"], "category": category, "score": score}}
|
|
|
|
|
|
|
|
|
def av_odyssey_aggregate_results(results):
|
|
|
"""
|
|
|
Args:
|
|
|
results: a list of values returned by process_results
|
|
|
Returns:
|
|
|
A score
|
|
|
"""
|
|
|
category2score = defaultdict(dict)
|
|
|
for result in results:
|
|
|
question_id = result["question_id"]
|
|
|
score = result["score"]
|
|
|
category = result["category"]
|
|
|
if question_id not in category2score[category]:
|
|
|
category2score[category][question_id] = []
|
|
|
category2score[category][question_id].append(score)
|
|
|
|
|
|
|
|
|
category_avg_scores = {}
|
|
|
total_score = 0
|
|
|
total_questions = 0
|
|
|
|
|
|
|
|
|
for category, questions in category2score.items():
|
|
|
|
|
|
|
|
|
category_total = 0
|
|
|
for question_id, score in questions.items():
|
|
|
category_total += score[0]
|
|
|
category_avg_scores[category] = category_total / len(questions) * 100.0
|
|
|
|
|
|
total_score += category_total
|
|
|
total_questions += len(questions)
|
|
|
|
|
|
|
|
|
overall_avg_score = total_score / total_questions * 100.0
|
|
|
|
|
|
|
|
|
print("Average scores per category:")
|
|
|
for category, avg_score in category_avg_scores.items():
|
|
|
print(f"{category}: {avg_score:.2f}")
|
|
|
|
|
|
|
|
|
print(f"Overall average score (across all questions): {overall_avg_score:.2f}")
|
|
|
|
|
|
return overall_avg_score
|
|
|
|