File size: 11,765 Bytes
5000658
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
import asyncio
import json
import math
import os
import threading
import time
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Iterable, List, Optional

import numpy as np
import torch

from tensorrt_llm import logger

from .._utils import release_gc
from ..profiler import device_memory_info, host_memory_info
from .llm import LLM, SamplingParams
from .utils import is_directory_empty, print_colored


@dataclass
class PerfItem:
    start: float
    end: float
    num_out_tokens: int = 0

    @property
    def lantency(self) -> float:
        return self.end - self.start


@dataclass
class Report:
    num_samples: int
    total_latency: float
    avg_latency: float
    seq_throughput: float
    token_throughput: float
    ave_tokens_per_sample: float
    memory_usage_samples: "MemoryContinuousMonitorThread.RecordList" = field(
        default_factory=list)

    def display(self):
        print_colored("Performance Report:\n", color="bold_green")
        print(f"num_samples: {self.num_samples}")
        print(f"total_latency: {self.total_latency:.2f}")
        print(f"avg_latency: {self.avg_latency:.2f}")
        print(f"seq_throughput: {self.seq_throughput:.2f}")
        print(f"token_throughput: {self.token_throughput:.2f}")
        print(f"average tokens per sample: {self.ave_tokens_per_sample:.2f}")
        if self.memory_usage_samples:
            print("Memory Usage:\n")
            min_record, max_record, average_record = self.memory_usage_samples.get_min(
            ), self.memory_usage_samples.get_max(
            ), self.memory_usage_samples.get_average()
            print(
                f"  host memory usage: (min: {min_record[0]}, max: {max_record[0]}, average: {average_record[0]})"
            )
            print(
                f"  gpu memory usage: (min: {min_record[1]}, max: {max_record[1]}, average: {average_record[1]})"
            )
        print_colored("__________________________________\n", color="green")

    def save_json(self, path: Path, **kwargs):
        with open(path, 'w') as f:
            data = self.__dict__.copy()
            data.update(kwargs)
            data["memory_usage_samples"] = [
                asdict(record) for record in self.memory_usage_samples
            ]
            json.dump(data, f)


@dataclass
class Sample:
    input_ids: List[int]
    output_len: int


class MemoryContinuousMonitorThread(threading.Thread):
    ''' Monitor the host memory and GPU memory usage. '''

    @dataclass
    class Record:
        time: float
        host_memory: float  # GiB
        gpu_memory: List[float]

        def to_dict(self):
            return dict(time=self.time,
                        host_memory=self.host_memory,
                        gpu_memory=self.gpu_memory)

    class RecordList(list):

        def __init__(self, *args, **kwargs):
            super(MemoryContinuousMonitorThread.RecordList,
                  self).__init__(*args, **kwargs)

        def get_min(self):
            return self._get_memory_usage('min')

        def get_max(self):
            return self._get_memory_usage('max')

        def get_average(self):
            return self._get_memory_usage('average')

        def _get_memory_usage(self, op: str):
            host_memory = [record.host_memory for record in self]
            gpu_memory = [record.gpu_memory for record in self]

            ops = dict(min=np.min, max=np.max, average=np.mean)
            theop = ops[op]

            return theop(host_memory), [theop(gpu) for gpu in zip(*gpu_memory)]

    def __init__(self, sampling_interval: float = 1):
        super(MemoryContinuousMonitorThread, self).__init__()
        self.sampling_interval = sampling_interval
        self._stop_event = threading.Event()
        self.memory_samples = MemoryContinuousMonitorThread.RecordList()

    def run(self):
        while not self._stop_event.is_set():
            record = self.monitor()
            logger.info(f'record: {record}')
            self.memory_samples.append(record)
            time.sleep(self.sampling_interval)

    def monitor(self) -> "MemoryContinuousMonitorThread.Record":
        return self.Record(time.time(), get_host_memory_usage(),
                           list(get_gpu_memory_usage()))

    def stop(self):
        self._stop_event.set()


def get_host_memory_usage() -> float:
    return host_memory_info(os.getpid())[0] / 1024**3  # GiB


def get_gpu_memory_usage() -> Iterable[float]:
    for device in range(torch.cuda.device_count()):
        yield device_memory_info(device)[0] / 1024**3


class LLMPerfEvaluator:

    @classmethod
    def create(cls,
               model: str,
               samples_path: Path,
               num_samples: int = -1,
               warmup: int = 100,
               batch_size: int = -1,
               engine_cache_path: Optional[Path] = None,
               memory_monitor_interval: Optional[int] = None,
               **kwargs) -> 'LLMPerfEvaluator':
        '''
        Args:
            model: The model name or a local path to the model directory.
            samples_path: path to the input data samples
            num_samples: number of the heading samples to run, if set to -1, all samples will be used
            warmup: number of samples for warmup runs
            batch_size: batch size for the runs, if left default, the batch size will be the same as the number of samples
            engine_cache_path: path to the engine file, if provided, the engine will save the built engine to the path and reuse it for the next runs
            memory_monitor_interval: the interval to monitor the host and GPU memory usage, if set to None, the memory monitor will be disabled
            kwargs: the additional arguments are for the LLM constructor
        '''

        from_cache = False
        if engine_cache_path and Path.exists(
                engine_cache_path
        ) and not is_directory_empty(engine_cache_path):
            print(f"Loading engine from {engine_cache_path}\n")
            from_cache = True
            model = engine_cache_path

        memory_monitor_thread = None
        if memory_monitor_interval is not None:
            memory_monitor_thread = MemoryContinuousMonitorThread(
                sampling_interval=memory_monitor_interval)
            memory_monitor_thread.start()

        # TODO[chunweiy]: Fixit, this barely work, the cpp runtime will trigger RuntimeError, which cannot be caught
        try:
            llm = LLM(model, skip_tokenizer_init=True, **kwargs)
        except Exception as e:
            logger.error(f"Failed to create LLM with {model} and {kwargs}")
            raise e

        if engine_cache_path is not None and not from_cache:
            print_colored(f"Saving engine to {engine_cache_path}\n", "green")
            llm.save(engine_cache_path)

        samples: List[Sample] = list(cls.load_dataset(samples_path))
        assert len(
            samples
        ) >= num_samples, f"num_samples {num_samples} is too large. The dataset only has {len(samples)} samples."
        samples = samples[:num_samples] if num_samples > 0 else samples

        return cls(llm,
                   warmup=warmup,
                   samples=samples,
                   max_num_samples=num_samples,
                   batch_size=batch_size,
                   memory_monitor_thread=memory_monitor_thread)

    def __init__(self,
                 llm: LLM,
                 samples: List[Sample],
                 warmup: int,
                 max_num_samples: int,
                 batch_size: int,
                 memory_monitor_thread: Optional[
                     MemoryContinuousMonitorThread] = None):
        self.llm = llm
        self.samples = samples
        self.warmup = warmup
        self.max_num_samples = max_num_samples
        self.perf_items = []
        self.batch_size = batch_size if batch_size > 0 else len(self.samples)
        self.memory_monitor_thread = memory_monitor_thread

        self.start = None
        self.end = None

    def run(self, end_id: int = -1, beam_width: int = 1) -> Report:
        # reset states
        self.perf_items = []
        sample_offset = 0

        sampling_params = SamplingParams(
            end_id=end_id,
            pad_id=end_id,
            beam_width=beam_width,
        )

        async def lane(num_tasks: int,
                       sampling_params: SamplingParams,
                       warmup=False):
            nonlocal sample_offset

            for i in range(num_tasks):
                sample = self.samples[sample_offset]
                sample_offset += 1
                sampling_params.max_new_tokens = sample.output_len
                sampling_params.end_id = -2
                sampling_params.pad_id = -2

                start = time.time()
                output = self.llm.generate_async(
                    sample.input_ids, sampling_params=sampling_params)
                output = await output.aresult()
                end = time.time()

                perf_item = PerfItem(start=start,
                                     end=end,
                                     num_out_tokens=sum(
                                         beam_output.length
                                         for beam_output in output.outputs))
                if not warmup:
                    self.perf_items.append(perf_item)

        if self.warmup > 0:
            logger.warning("warming up ...")
            for i in range(math.ceil(self.warmup / len(self.samples))):
                asyncio.run(
                    lane(min(self.warmup, len(self.samples)),
                         sampling_params,
                         warmup=True))
            sample_offset = 0

        logger.warning("running ...")
        self.start = time.time()

        async def run_lanes():
            num_tasks = len(self.samples) // self.batch_size
            lanes = [
                lane(num_tasks, sampling_params) for _ in range(self.batch_size)
            ]
            await asyncio.gather(*lanes)

        asyncio.run(run_lanes())
        self.end = time.time()

        return self._generate_report()

    @staticmethod
    def load_dataset(path: Path) -> Iterable[Sample]:
        with open(path, 'r') as f:
            dataset = json.load(f)

        for sample in dataset["samples"]:
            yield Sample(input_ids=sample["input_ids"],
                         output_len=sample["output_len"])

    def _generate_report(self) -> Report:
        num_samples = len(self.perf_items)
        total_latency = self.end - self.start
        avg_latency = total_latency / num_samples
        seq_throughput = num_samples / total_latency
        token_throughput = sum(
            [perf_item.num_out_tokens
             for perf_item in self.perf_items]) / total_latency
        total_tokens = sum(
            [perf_item.num_out_tokens for perf_item in self.perf_items])

        return Report(
            num_samples=num_samples,
            total_latency=total_latency,
            avg_latency=avg_latency,
            seq_throughput=seq_throughput,
            token_throughput=token_throughput,
            ave_tokens_per_sample=total_tokens / num_samples,
            memory_usage_samples=self.memory_monitor_thread.memory_samples
            if self.memory_monitor_thread else [])

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.llm.__exit__(exc_type, exc_value, traceback)
        del self.llm
        release_gc()

        if self.memory_monitor_thread:
            self.memory_monitor_thread.stop()
            self.memory_monitor_thread.join()
            del self.memory_monitor_thread