File size: 7,177 Bytes
a9bd396
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any

import numpy as np

from .hardware_metrics import GPURawMetrics, HardwareInfo


def compute_basic_statistics(measurements: list[float]) -> dict[str, float]:
    return {
        "avg": np.mean(measurements) if measurements else 0,
        "std": np.std(measurements) if measurements else 0,
        "min": np.min(measurements) if measurements else 0,
        "med": np.median(measurements) if measurements else 0,
        "max": np.max(measurements) if measurements else 0,
        "p95": np.percentile(measurements, 95) if measurements else 0,
    }


def add_unit_to_duration(stats: dict[str, float]) -> dict[str, str]:
    for key in list(stats.keys()):
        value = stats[key]
        if value > 3600:
            stats[key] = f"{(value / 3600):.2f}hr"
        elif value > 60:
            stats[key] = f"{(value / 60):.2f}min"
        elif value > 1:
            stats[key] = f"{value:.2f}s"
        elif value > 1e-3:
            stats[key] = f"{(value * 1e3):.2f}ms"
        elif value > 1e-6:
            stats[key] = f"{(value * 1e6):.2f}us"
        else:
            stats[key] = f"{(value * 1e9):.2f}ns"
    return stats


def equalize_lengths_and_collate(stats: dict[str, dict[str, str]]) -> dict[str, str]:
    """Note: This operation is destructive as it will update values in place before returning a new correctly formatted dict"""
    keys = ["avg", "std", "min", "med", "max", "p95"]
    for key in keys:
        max_length = max(len(stat[key]) for stat in stats.values())
        for stat in stats.values():
            stat[key] = stat[key].ljust(max_length, " ")
    return {name: " ".join([f"{key}={stat[key]}" for key in keys]) for name, stat in stats.items()}


def pretty_print_dict(data: dict[str, str], tabs: int = 0) -> None:
    max_key_length = max([len(key) for key in data.keys()])
    for key, value in data.items():
        tabs_str = "  " * tabs
        padded_key = key.ljust(max_key_length + 1, ".")
        print(f"{tabs_str}{padded_key}: {value}")


@dataclass
class BenchmarkMetadata:
    """Metadata collected for each benchmark run."""

    model_id: str
    timestamp: str
    branch_name: str
    commit_id: str
    commit_message: str
    hardware_info: HardwareInfo
    success: bool

    def __init__(
        self, model_id: str, commit_id: str, branch_name: str = "main", commit_message: str = "", success: bool = True
    ) -> None:
        self.model_id = model_id
        self.timestamp = datetime.now(timezone.utc).isoformat()
        self.branch_name = branch_name
        self.commit_id = commit_id
        self.commit_message = commit_message
        self.hardware_info = HardwareInfo()
        self.success = success

    def to_dict(self) -> dict[str, Any]:
        return {
            "model_id": self.model_id,
            "timestamp": self.timestamp,
            "branch_name": self.branch_name,
            "commit_id": self.commit_id,
            "commit_message": self.commit_message,
            "hardware_info": self.hardware_info.to_dict(),
            "success": self.success,
        }


class BenchmarkResult:
    """Result from a series of benchmark runs."""

    def __init__(self) -> None:
        self.e2e_latency = []
        self._timestamps = []
        self.time_to_first_token = []
        self.inter_token_latency = []
        self.shape_and_decoded_outputs = []
        self.gpu_metrics = []

    def accumulate(
        self,
        e2e_latency: float,
        timestamps: list[float],
        shape_and_decoded_output: str,
        gpu_metrics: GPURawMetrics | None,
    ) -> None:
        self.e2e_latency.append(e2e_latency)
        self._timestamps.append(timestamps)
        self._accumulate_ttft_and_itl(timestamps)
        self.shape_and_decoded_outputs.append(shape_and_decoded_output)
        self.gpu_metrics.append(gpu_metrics)

    def _accumulate_ttft_and_itl(self, timestamps: list[float]) -> None:
        timestamps = np.array(timestamps)
        tftt = np.min(timestamps[:, 0])
        itl = np.mean(timestamps[:, -1] - timestamps[:, 0]) / (timestamps.shape[1] - 1)
        self.time_to_first_token.append(tftt)
        self.inter_token_latency.append(itl)

    def to_dict(self, summarized: bool = False) -> dict[str, Any]:
        # Save GPU metrics as None if it contains only None values or if we are summarizing
        if summarized or all(gm is None for gm in self.gpu_metrics):
            gpu_metrics = None
        else:
            gpu_metrics = [gm.to_dict() for gm in self.gpu_metrics]
        return {
            "e2e_latency": self.e2e_latency,
            "time_to_first_token": self.time_to_first_token,
            "inter_token_latency": self.inter_token_latency,
            "shape_and_decoded_outputs": self.shape_and_decoded_outputs,
            "gpu_metrics": gpu_metrics,
            "timestamps": None if summarized else self._timestamps,
        }

    @classmethod
    def from_dict(cls, data: dict[str, None | int | float]) -> "BenchmarkResult":
        # Handle GPU metrics, which is saved as None if it contains only None values
        if data["gpu_metrics"] is None:
            gpu_metrics = [None for _ in range(len(data["e2e_latency"]))]
        else:
            gpu_metrics = [GPURawMetrics.from_dict(gm) for gm in data["gpu_metrics"]]
        # Handle timestamps, which can be saved as None to reduce file size
        if data["timestamps"] is None:
            timestamps = [None for _ in range(len(data["e2e_latency"]))]
        else:
            timestamps = data["timestamps"]
        # Create a new instance and accumulate the data
        new_instance = cls()
        new_instance.e2e_latency = data["e2e_latency"]
        new_instance._timestamps = timestamps
        new_instance.time_to_first_token = data["time_to_first_token"]
        new_instance.inter_token_latency = data["inter_token_latency"]
        new_instance.shape_and_decoded_outputs = data["shape_and_decoded_outputs"]
        new_instance.gpu_metrics = gpu_metrics
        return new_instance

    def get_throughput(self, total_generated_tokens: int) -> list[float]:
        return [total_generated_tokens / e2e_latency for e2e_latency in self.e2e_latency]

    def pprint(self, batch_size: int = 0, num_generated_tokens: int = 0, tabs: int = 0) -> None:
        measurements = {
            "E2E Latency": add_unit_to_duration(compute_basic_statistics(self.e2e_latency)),
            "Time to First Token": add_unit_to_duration(compute_basic_statistics(self.time_to_first_token)),
        }
        if len(self.inter_token_latency) > 0:
            measurements["Inter-Token Latency"] = add_unit_to_duration(
                compute_basic_statistics(self.inter_token_latency)
            )
        if batch_size > 0:
            throughput_stats = compute_basic_statistics(self.get_throughput(batch_size * num_generated_tokens))
            measurements["Throughput"] = {key: f"{value:.2f}tok/s" for key, value in throughput_stats.items()}
        dict_to_pprint = equalize_lengths_and_collate(measurements)
        pretty_print_dict(dict_to_pprint, tabs=tabs)