|
|
|
|
|
|
|
|
import json |
|
|
import os |
|
|
import re |
|
|
from collections import defaultdict |
|
|
from statistics import mean |
|
|
|
|
|
import matplotlib.pyplot as plt |
|
|
from matplotlib.colors import LinearSegmentedColormap |
|
|
from rich import print |
|
|
|
|
|
|
|
|
def normalize_to_base_name(name: str) -> str: |
|
|
name = ( |
|
|
name.replace("Chat", "") |
|
|
.replace("instruct", "") |
|
|
.replace("code-llama", "CodeLlama") |
|
|
.replace("deepseek-coder", "DeepSeek-Coder") |
|
|
.replace("gpt-4-turbo", "GPT-4-Turbo") |
|
|
.replace("starcoder", "StarCoder") |
|
|
.replace("--v0.1", "") |
|
|
.replace("-base", "") |
|
|
.replace("-preview", "") |
|
|
.strip("-") |
|
|
) |
|
|
|
|
|
return re.sub(r"(\d+)b", r"\1B", name) |
|
|
|
|
|
|
|
|
def load_dps_scores(path: str, norm: bool = False): |
|
|
with open(path) as f: |
|
|
results = json.load(f) |
|
|
|
|
|
task2score = {} |
|
|
for task_id, result in results.items(): |
|
|
|
|
|
new_key = "norm_scores" if norm else "scores" |
|
|
if result.get(new_key) is not None: |
|
|
task2score[task_id] = result[new_key]["avg"] |
|
|
|
|
|
legacy_key = "dps_norm" if norm else "dps" |
|
|
if result.get(legacy_key) is not None: |
|
|
task2score[task_id] = mean(result[legacy_key]) |
|
|
|
|
|
return task2score |
|
|
|
|
|
|
|
|
|
|
|
def parse_model_and_type(result_json: str): |
|
|
assert "_temp_0.2_" in result_json, f"Invalid result file name: {result_json}" |
|
|
model_id, rest = result_json.split("_temp_0.2_") |
|
|
type = rest.split("_")[1] |
|
|
model_id = normalize_to_base_name(model_id) |
|
|
|
|
|
nb = re.search(r"(\d+(?:\.\d+)?)B", model_id) |
|
|
if nb: |
|
|
print(nb) |
|
|
nb = nb.group(1) |
|
|
model_id = model_id.replace(f"{nb}B", "").strip("-") |
|
|
else: |
|
|
nb = None |
|
|
return model_id, nb, type |
|
|
|
|
|
|
|
|
def load_groups_from_directory(result_dir: str, norm: bool = False): |
|
|
groups = defaultdict(dict) |
|
|
|
|
|
for result_json in os.listdir(result_dir): |
|
|
if not result_json.endswith(".json"): |
|
|
continue |
|
|
model_id, nb, type = parse_model_and_type(result_json) |
|
|
if not (type == "instruct" or (type == "base" and model_id == "StarCoder2")): |
|
|
continue |
|
|
|
|
|
if not nb: |
|
|
continue |
|
|
|
|
|
print(f"{type = :<16}\t{model_id = } {nb = }") |
|
|
model_id = f"{model_id} ({type})" |
|
|
groups[model_id][nb] = load_dps_scores( |
|
|
os.path.join(result_dir, result_json), norm |
|
|
) |
|
|
|
|
|
|
|
|
for model_id in groups: |
|
|
groups[model_id] = dict( |
|
|
sorted(groups[model_id].items(), key=lambda x: -float(x[0])) |
|
|
) |
|
|
|
|
|
|
|
|
groups = { |
|
|
model_id: {f"{nb}B": vv for nb, vv in v.items()} |
|
|
for model_id, v in groups.items() |
|
|
} |
|
|
|
|
|
return groups |
|
|
|
|
|
|
|
|
def compute_score_matrix(group: dict): |
|
|
grp_keys = list(group.keys()) |
|
|
score_matrix = [] |
|
|
for i, type_x in enumerate(grp_keys): |
|
|
score_list = [] |
|
|
for j, type_y in enumerate(grp_keys): |
|
|
if j <= i or type_y not in group or type_x not in group: |
|
|
score_list.append((0, 0)) |
|
|
continue |
|
|
task2dps_x = group[type_x] |
|
|
task2dps_y = group[type_y] |
|
|
common_tasks = set(task2dps_x.keys()) & set(task2dps_y.keys()) |
|
|
if not common_tasks: |
|
|
score_list.append(None) |
|
|
print(f"No common tasks between {type_x} and {type_y}") |
|
|
continue |
|
|
dps_x = mean([task2dps_x[task_id] for task_id in common_tasks]) |
|
|
dps_y = mean([task2dps_y[task_id] for task_id in common_tasks]) |
|
|
print(type_x, dps_x, " --- ", type_y, dps_y) |
|
|
score_list.append((dps_x, dps_y)) |
|
|
score_matrix.append(score_list) |
|
|
return score_matrix |
|
|
|
|
|
|
|
|
def main(result_dir: str, norm: bool = False, latex: bool = False): |
|
|
if latex: |
|
|
plt.rc("text", usetex=True) |
|
|
plt.rc("text.latex", preamble=r"\usepackage{xfrac}") |
|
|
assert os.path.isdir(result_dir), f"{result_dir} is not a directory." |
|
|
|
|
|
groups = load_groups_from_directory(result_dir, norm=norm) |
|
|
groups = {k: v for k, v in groups.items() if len(v) >= 2} |
|
|
|
|
|
groups = dict(sorted(groups.items())) |
|
|
|
|
|
n_grp = len(groups) |
|
|
max_grp_per_row = 3 |
|
|
n_row = (n_grp + max_grp_per_row - 1) // max_grp_per_row |
|
|
|
|
|
fig, axs = plt.subplots( |
|
|
n_row, |
|
|
max_grp_per_row, |
|
|
figsize=(2 * max_grp_per_row, 2 * n_row), |
|
|
constrained_layout=True, |
|
|
) |
|
|
|
|
|
for k, (model, group) in enumerate(groups.items()): |
|
|
grp_keys = list(group.keys()) |
|
|
score_matrix = compute_score_matrix(group) |
|
|
score_matrix_diff = [ |
|
|
[(score[0] - score[1]) for score in score_list] |
|
|
for score_list in score_matrix |
|
|
] |
|
|
ax: plt.Axes = axs[k] |
|
|
cmap = LinearSegmentedColormap.from_list("rg", ["r", "w", "lime"], N=256) |
|
|
|
|
|
cax = ax.matshow(score_matrix_diff, cmap=cmap) |
|
|
cax.set_clim(-25, 25) |
|
|
ax.set_xticks(range(len(grp_keys))) |
|
|
ax.set_yticks(range(len(grp_keys))) |
|
|
ax.set_xticklabels(grp_keys, rotation=30, ha="left", rotation_mode="anchor") |
|
|
ax.set_yticklabels(grp_keys) |
|
|
ax.tick_params(bottom=False) |
|
|
for i in range(len(grp_keys)): |
|
|
for j in range(len(grp_keys)): |
|
|
if j <= i: |
|
|
continue |
|
|
x, y = score_matrix[i][j] |
|
|
if x == 0 and y == 0: |
|
|
continue |
|
|
gapx = 0.15 |
|
|
gapy = 0.25 |
|
|
ax.text( |
|
|
j - gapx, |
|
|
i + gapy, |
|
|
f"{x:.1f}", |
|
|
va="center", |
|
|
ha="center", |
|
|
color="green" if x > y else "red", |
|
|
) |
|
|
ax.text( |
|
|
j + gapx, |
|
|
i - gapy, |
|
|
f"{y:.1f}", |
|
|
va="center", |
|
|
ha="center", |
|
|
color="green" if x < y else "red", |
|
|
) |
|
|
xlabel = model |
|
|
if latex: |
|
|
xlabel = r"\textbf{" + xlabel + "}" |
|
|
ax.set_xlabel(xlabel) |
|
|
|
|
|
imname = "perf_param_impact" |
|
|
if norm: |
|
|
imname += "_norm" |
|
|
plt.savefig(f"{imname}.png", dpi=100, bbox_inches="tight") |
|
|
plt.savefig(f"{imname}.pdf", dpi=100, bbox_inches="tight") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
from fire import Fire |
|
|
|
|
|
Fire(main) |
|
|
|