File size: 14,017 Bytes
3070e58 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 |
import pandas as pd
import json
from .CONSTANTS import *
class ExcelProcessor:
def __init__(self, excel_path, invalid_models=None):
"""Initialize the ExcelProcessor with an Excel file.
Args:
excel_path (str): Path to the Excel file containing model and task data.
"""
# excel_path = path to excel file
self.sheet_path = excel_path
self.excel_data = self.load_excel()
self.model_sheet = self.load_sheet("Models (Simplified)")
self.invalid_models = invalid_models
print("You have excluded the following models: ", self.invalid_models)
# Get all of the valid models (exclude invalid models)
self.valid_models = self.get_valid_models(self.invalid_models)
# print("VALID MODELS: ", self.valid_models)
def load_excel(self):
"""Load the Excel file into a pandas ExcelFile object.
Returns:
pd.ExcelFile: The loaded Excel file object.
"""
return pd.ExcelFile(self.sheet_path)
def load_sheet(self, sheet_name):
"""Load a specific sheet from the Excel file.
Args:
sheet_name (str): Name of the sheet to load.
Returns:
pd.DataFrame: The loaded sheet as a pandas DataFrame.
"""
return self.excel_data.parse(sheet_name)
def get_valid_models(self, invalid_models=None):
"""Get all valid models from the Models sheet, excluding invalid ones.
Returns:
list: List of valid model names that should be included in evaluation.
"""
valid_models = []
for idx, model_name in enumerate(self.model_sheet["Name"]):
if model_name not in invalid_models:
valid_models.append(model_name)
return valid_models
def get_valid_columns(self, sheet_name):
"""Get all non-empty columns from a specified sheet.
Args:
sheet_name (str): Name of the sheet to analyze.
Returns:
list: List of valid column names (excluding unnamed columns).
"""
valid_columns = []
for column in self.load_sheet(sheet_name).columns:
if column.split(' ')[0] != "Unnamed:":
valid_columns.append(column.strip())
return valid_columns
def get_model_information(self,
sheet_name = "Models (Simplified)",
name_column = "Name",
domain_column = "Domain",
license_column = "License",
size_column = "Size (B)",
):
"""Extract model information from the Models sheet.
Args:
sheet_name (str, optional): Name of the sheet containing model info.
Defaults to "Models (Simplified)".
name_column (str, optional): Column name containing model names.
Defaults to "Name".
domain_column (str, optional): Column name containing model domains.
Defaults to "Domain".
license_column (str, optional): Column name containing license info.
Defaults to "License".
size_column (str, optional): Column name containing model sizes.
Defaults to "Size (B)".
Returns:
tuple: A tuple containing 7 dictionaries:
- model_name_info: Model names indexed by position
- domain_info: Model domains mapped using DOMAIN_MAPPING
- license_info: License information (abbreviated if needed)
- accessibility_info: Accessibility mapped using LICENSE_MAPPING
- displayed_size_info: Raw size values for display
- hidden_size_info: Size ranges for filtering
- T_info: Position markers for the leaderboard
"""
# Load the model sheet
model_sheet = self.load_sheet(sheet_name)
# Everything to be returned.
T_info = {}
model_name_info = {}
domain_info = {}
license_info = {}
accessibility_info = {}
displayed_size_info = {} # shown on leaderboard
hidden_size_info = {} # hidden column
def map_size(param_size):
"""Map parameter size to predefined ranges.
Args:
param_size: The parameter size value.
Returns:
str: Size range category.
"""
if param_size == "/":
return "None"
if param_size == "Unknown":
return "Unknown"
size = int(param_size)
if size < 5:
return "0-5"
elif size < 10:
return "5-10"
elif size < 40:
return "10-40"
elif size < 80:
return "40-80"
else:
return ">80"
i = 0
for name, domain, license, size in zip(model_sheet[name_column],
model_sheet[domain_column],
model_sheet[license_column],
model_sheet[size_column]):
# If it is a valid model (used in evaluation)
if name in self.valid_models:
T_info[f"{i}"] = "\ud83d\udd36"
model_name_info[f"{i}"] = name
domain_info[f"{i}"] = DOMAIN_MAPPING[domain]
if license == "PhysioNet Credentialed Health Data License 1.5.0":
license_info[f"{i}"] = "PhysioNet 1.5.0" # Abbreviate license name to fit on leaderboard
else:
license_info[f"{i}"] = license
accessibility_info[f"{i}"] = LICENSE_MAPPING[license]
displayed_size_info[f"{i}"] = size
hidden_size_info[f"{i}"] = map_size(size)
i += 1
else:
print("Invalid model: ", name)
return model_name_info, domain_info, license_info, accessibility_info, displayed_size_info, hidden_size_info, T_info
def get_sheet_information(self, sheets_list, task_names_list, task_types_list):
"""Extract task performance information from specified sheets.
Args:
sheets_list (list): List of sheet names to process.
task_names_list (list): List of task names corresponding to each sheet.
task_types_list (list): List of task types ('ext', 'gen', etc.) for each sheet.
Returns:
dict: Dictionary mapping task names to model performance data.
Format: {task_name: {model_index: performance_score}}
"""
task_info = {}
# Iterate through each row
for idx, sheet in enumerate(sheets_list):
# Get the task type (tt)
tt = task_types_list[idx]
# Load the sheet
model_sheet = self.load_sheet(sheet)
# Name of the task (i.e. 1.1-ADE Identification)
task_name = task_names_list[idx]
# Get all columns in the sheet
for i, t in enumerate(model_sheet['Task Type']):
if i == 0:
continue
# Break out of loop when it reaches the end of the sheet
if t == "-":
break
row = i
task_counter = 0
for model in self.valid_models:
column_name = model.strip()
if column_name == "gpt-35-turbo-0125":
column_name = "gpt-35-turbo"
elif column_name == "gpt-4o-0806":
column_name = "gpt-4o"
elif column_name == "gemini-2.0-flash-001":
column_name = "gemini-2.0-flash"
elif column_name == "gemini-1.5-pro-002":
column_name = "gemini-1.5-pro"
if column_name == "gpt-oss-20b":
column_name = "gpt-oss-20b-high"
elif column_name == "gpt-oss-120b":
column_name = "gpt-oss-120b-high"
if tt == 'ext':
column_name = column_name + '.1'
elif tt == 'gen':
column_name = column_name + '.1'
# Name of the task (i.e 1.1-ADE Identification)
task = model_sheet[task_name][row]
# Update task name to more simple version
task = TASK_MAPPING[task]
if task == "Average score":
break
# Update the information for each task
if task not in task_info:
task_info[task] = {}
task_info[task][f"{task_counter}"] = round(float(model_sheet[column_name.strip()][row].split(" ")[0]), 2)
task_counter += 1
return task_info
def add_average_performance(self, task_info):
"""Calculate average performance across all tasks for each model.
Args:
task_info (dict): Dictionary containing task performance data.
Format: {task_name: {model_index: performance_score}}
Returns:
dict: Dictionary mapping model indices to average performance scores.
Format: {model_index: average_score}
"""
for task in task_info:
n = len(task_info[task])
break
average_performance_info = {}
for i in range(n):
perf = 0
num_tasks = 0
for task in task_info:
perf += float(task_info[task][str(i)])
num_tasks += 1
average_performance_info[f"{i}"] = str(round(perf / num_tasks, 2))
return average_performance_info
def create_leaderboards(
self,
sheet_names_list=None,
task_names_list=["Task-Classification", "Task-Extraction", "Task-Generation"],
task_types_list=["cls", "ext", "gen"],
output_path=None):
"""Create a leaderboard JSON file from Excel data.
Args:
sheet_names_list (list, optional): List of sheet names to process.
task_names_list (list, optional): List of task names corresponding to sheets.
task_types_list (list, optional): List of task types for each sheet.
leaderboard_name (str, optional): Name of the leaderboard being created.
output_path (str, optional): Path where the JSON file should be saved.
Note:
Creates one leaderboard per call (CoT, Direct, or Few-Shot).
The output JSON contains model information, task performance, and metadata.
"""
data = {}
model_info, domain_info, license_info, accessibility_info, displayed_size_info, hidden_size_info, T_info = self.get_model_information()
task_info = self.get_sheet_information(sheet_names_list, task_names_list, task_types_list)
average_performance_info = self.add_average_performance(task_info)
data["T"] = T_info
data["Model"] = model_info
data["Model: Domain"] = domain_info
data["Model: License"] = license_info
data["Model: Accessibility"] = accessibility_info
data["Size (B)"] = displayed_size_info
data["Model: Size Range"] = hidden_size_info
data["Average Performance"] = average_performance_info
for task in task_info:
data[task] = task_info[task]
with open(output_path, 'w') as file:
json.dump(data, file, indent=4)
def create_task_information(self, output_path: str):
"""Create a JSON file containing detailed task information.
Args:
output_path (str): Path where the task information JSON should be saved.
Note:
Extracts task metadata from the "Task-all" sheet including language,
task type, clinical context, data access requirements, applications,
and clinical stage information.
"""
task_sheet = self.load_sheet("Task-all")
# Initialize a map to store the json information
info = {}
# Iterate through the "Task-Original" column, which contains all of the task names
for idx, task in enumerate(task_sheet["Task name"]):
# Add the task to the final json
if task not in info:
info[task] = {}
# Add all of the attributes to the task
language = task_sheet["Language"][idx]
task_type = task_sheet["Task Type - fine grained"][idx]
clinical_context = task_sheet["Clinical context"][idx]
data_access = task_sheet["Data Access\nOpen Access (OA) / \nRegulated (R) / \nPhysionet (P) / \nn2c2 (N)"][idx]
application = task_sheet['Clinical Application'][idx]
clinical_stage = task_sheet['Clinical Stage'][idx]
info[task]["Language"] = language.strip()
info[task]["Task Type"] = task_type.strip()
info[task]["Clinical Context"] = clinical_context.strip()
info[task]["Data Access"] = DATA_ACCESS_MAP[data_access.strip()]
info[task]['Applications'] = application.strip()
info[task]['Clinical Stage'] = clinical_stage.strip()
with open(output_path, 'w') as file:
json.dump(info, file, indent=4) |