Spaces:
Sleeping
Sleeping
| import json | |
| import numpy as np | |
| from typing import Optional | |
| def make_id(config: dict, keys_to_ignore: list[str]) -> str: | |
| keys = sorted(set(config.keys())) | |
| return "_".join(str(config[k]) for k in keys if k not in keys_to_ignore) | |
| class ModelBenchmarkData: | |
| def __init__(self, json_path: str) -> None: | |
| with open(json_path, "r") as f: | |
| self.data = json.load(f) | |
| def compute_e2e_latency(self, measures: dict) -> tuple[float, Optional[float]]: | |
| return measures["e2e_latency"] | |
| def compute_ttft(self, measures: dict) -> float: | |
| return measures["t_tokens"][0] - measures["wall_time_start"] | |
| def compute_itl(self, measures: dict) -> Optional[float]: | |
| if len(measures["t_tokens"]) < 2: | |
| return None | |
| delta_t = measures["t_tokens"][-1] - measures["t_tokens"][0] | |
| num_tokens = len(measures["t_tokens"]) - 1 | |
| return delta_t / num_tokens | |
| def get_main_batch_size(self) -> int: | |
| batch_sizes = {} | |
| for cfg_name, data in self.data.items(): | |
| for measure in data["measures"]: | |
| bs = measure["batch_size"] | |
| if bs not in batch_sizes: | |
| batch_sizes[bs] = 0 | |
| batch_sizes[bs] += 1 | |
| return max(batch_sizes, key=batch_sizes.get) | |
| def get_bar_plot_data(self, collapse_on_cache: bool = True, collapse_on_compile_mode: bool = True) -> dict: | |
| # Gather data for each scenario | |
| per_scenario_data = {} | |
| for i, (cfg_name, data) in enumerate(self.data.items()): | |
| per_scenario_data[cfg_name] = { | |
| "ttft": [self.compute_ttft(d) for d in data["measures"]], | |
| "itl": [self.compute_itl(d) for d in data["measures"]], | |
| "e2e": [self.compute_e2e_latency(d) for d in data["measures"]], | |
| "config": data["metadata"]["config"], | |
| } | |
| # Eventually collapse on cache | |
| if collapse_on_cache: | |
| collapsed_keys = {} | |
| for cfg_name, data in per_scenario_data.items(): | |
| keys_to_ignore = ["name"] | |
| keys_to_ignore += (["use_cache"] if collapse_on_cache else []) | |
| keys_to_ignore += (["compile_mode"] if collapse_on_compile_mode else []) | |
| cfg_id = make_id(data["config"], keys_to_ignore) | |
| cfg_e2e = np.mean(data["e2e"]) | |
| other_name, other_e2e = collapsed_keys.get(cfg_id, (None, 1e16)) | |
| if cfg_e2e < other_e2e: | |
| collapsed_keys[cfg_id] = (cfg_name, cfg_e2e) | |
| per_scenario_data = {k: per_scenario_data[k] for k, _ in collapsed_keys.values()} | |
| return per_scenario_data | |
| def load_data(keep_common_scenarios_only: bool = False) -> dict[str, ModelBenchmarkData]: | |
| data = { | |
| "MI325": ModelBenchmarkData("mi325_data.json"), | |
| "H100": ModelBenchmarkData("h100_data.json"), | |
| } | |
| if keep_common_scenarios_only: | |
| common_scenarios = set(data["MI325"].data.keys()) & set(data["H100"].data.keys()) | |
| for device_name, device_data in data.items(): | |
| device_data.data = {k: v for k, v in device_data.data.items() if k in common_scenarios} | |
| return data | |