| |
|
|
| import json |
| import os |
| import re |
| from collections import defaultdict |
| from statistics import mean |
|
|
| import matplotlib.pyplot as plt |
| from matplotlib.colors import LinearSegmentedColormap |
| from rich import print |
|
|
|
|
| def normalize_to_base_name(name: str) -> str: |
| name = ( |
| name.replace("Chat", "") |
| .replace("instruct", "") |
| .replace("code-llama", "CodeLlama") |
| .replace("deepseek-coder", "DeepSeek-Coder") |
| .replace("gpt-4-turbo", "GPT-4-Turbo") |
| .replace("starcoder", "StarCoder") |
| .replace("--v0.1", "") |
| .replace("-base", "") |
| .replace("-preview", "") |
| .strip("-") |
| ) |
| |
| return re.sub(r"(\d+)b", r"\1B", name) |
|
|
|
|
| def load_dps_scores(path: str, norm: bool = False): |
| with open(path) as f: |
| results = json.load(f) |
|
|
| task2score = {} |
| for task_id, result in results.items(): |
| |
| new_key = "norm_scores" if norm else "scores" |
| if result.get(new_key) is not None: |
| task2score[task_id] = result[new_key]["avg"] |
| |
| legacy_key = "dps_norm" if norm else "dps" |
| if result.get(legacy_key) is not None: |
| task2score[task_id] = mean(result[legacy_key]) |
|
|
| return task2score |
|
|
|
|
| |
| def parse_model_and_type(result_json: str): |
| assert "_temp_0.2_" in result_json, f"Invalid result file name: {result_json}" |
| model_id, rest = result_json.split("_temp_0.2_") |
| type = rest.split("_")[1] |
| model_id = normalize_to_base_name(model_id) |
| |
| nb = re.search(r"(\d+(?:\.\d+)?)B", model_id) |
| if nb: |
| print(nb) |
| nb = nb.group(1) |
| model_id = model_id.replace(f"{nb}B", "").strip("-") |
| else: |
| nb = None |
| return model_id, nb, type |
|
|
|
|
| def load_groups_from_directory(result_dir: str, norm: bool = False): |
| groups = defaultdict(dict) |
|
|
| for result_json in os.listdir(result_dir): |
| if not result_json.endswith(".json"): |
| continue |
| model_id, nb, type = parse_model_and_type(result_json) |
| print(f"{type = :<16}\t{model_id = }") |
|
|
| if nb: |
| model_id = f"{model_id} ({nb}B)" |
| groups[model_id][type] = load_dps_scores( |
| os.path.join(result_dir, result_json), norm |
| ) |
|
|
| return groups |
|
|
|
|
| TYPES = ["perf-CoT", "perf-instruct", "instruct", "base"] |
|
|
|
|
| def compute_score_matrix(group: dict): |
| score_matrix = [] |
| for i, type_x in enumerate(TYPES): |
| score_list = [] |
| for j, type_y in enumerate(TYPES): |
| if j <= i or type_y not in group or type_x not in group: |
| score_list.append((0, 0)) |
| continue |
| task2dps_x = group[type_x] |
| task2dps_y = group[type_y] |
| common_tasks = set(task2dps_x.keys()) & set(task2dps_y.keys()) |
| if not common_tasks: |
| score_list.append(None) |
| print(f"No common tasks between {type_x} and {type_y}") |
| continue |
| dps_x = mean([task2dps_x[task_id] for task_id in common_tasks]) |
| dps_y = mean([task2dps_y[task_id] for task_id in common_tasks]) |
| print(type_x, dps_x, " --- ", type_y, dps_y) |
| score_list.append((dps_x, dps_y)) |
| score_matrix.append(score_list) |
| return score_matrix |
|
|
|
|
| def main(result_dir: str, norm: bool = False, latex: bool = False): |
| if latex: |
| plt.rc("text", usetex=True) |
| plt.rc("text.latex", preamble=r"\usepackage{xfrac}") |
| assert os.path.isdir(result_dir), f"{result_dir} is not a directory." |
|
|
| groups = load_groups_from_directory(result_dir, norm=norm) |
| groups = {k: v for k, v in groups.items() if len(v) >= 2} |
| |
| groups = dict(sorted(groups.items())) |
|
|
| n_grp = len(groups) |
| max_grp_per_row = 5 |
| n_row = (n_grp + max_grp_per_row - 1) // max_grp_per_row |
|
|
| fig, axs = plt.subplots( |
| n_row, |
| max_grp_per_row, |
| figsize=(2 * max_grp_per_row, 2 * n_row), |
| constrained_layout=True, |
| ) |
|
|
| for i, (model, group) in enumerate(groups.items()): |
| score_matrix = compute_score_matrix(group) |
| score_matrix_diff = [ |
| [(score[0] - score[1]) for score in score_list] |
| for score_list in score_matrix |
| ] |
| ax: plt.Axes = axs[i // max_grp_per_row, i % max_grp_per_row] |
| cmap = LinearSegmentedColormap.from_list("rg", ["r", "w", "lime"], N=256) |
| |
| cax = ax.matshow(score_matrix_diff, cmap=cmap) |
| cax.set_clim(-15, 15) |
| ax.set_xticks(range(len(TYPES))) |
| ax.set_yticks(range(len(TYPES))) |
| if i // max_grp_per_row == 0: |
| ax.set_xticklabels(TYPES, rotation=30, ha="left", rotation_mode="anchor") |
| else: |
| ax.set_xticklabels([]) |
| ax.tick_params(top=False) |
| if i % max_grp_per_row == 0: |
| ax.set_yticklabels(TYPES) |
| else: |
| ax.set_yticklabels([]) |
| ax.tick_params(left=False) |
| ax.tick_params(bottom=False) |
| for i in range(len(TYPES)): |
| for j in range(len(TYPES)): |
| if j <= i: |
| continue |
| x, y = score_matrix[i][j] |
| if x == 0 and y == 0: |
| continue |
| gapx = 0.15 |
| gapy = 0.25 |
| ax.text( |
| j - gapx, |
| i + gapy, |
| f"{x:.1f}", |
| va="center", |
| ha="center", |
| color="green" if x > y else "red", |
| ) |
| ax.text( |
| j + gapx, |
| i - gapy, |
| f"{y:.1f}", |
| va="center", |
| ha="center", |
| color="green" if x < y else "red", |
| ) |
| xlabel = model |
| if latex: |
| xlabel = r"\textbf{" + xlabel + "}" |
| ax.set_xlabel(xlabel) |
|
|
| imname = "perf_prompt_impact" |
| if norm: |
| imname += "_norm" |
| plt.savefig(f"{imname}.png", dpi=100, bbox_inches="tight") |
| plt.savefig(f"{imname}.pdf", dpi=100, bbox_inches="tight") |
|
|
|
|
| if __name__ == "__main__": |
| from fire import Fire |
|
|
| Fire(main) |
|
|