File size: 13,616 Bytes
5000658 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 |
import os
import shutil
import tempfile
from pathlib import Path
from typing import Any, Iterable, List, Optional, Union
from transformers import PreTrainedTokenizerBase
from .. import bindings as tllm
from ..bindings import executor as tllm
from ..executor import GenerationExecutor, GenerationResult
from ..logger import logger
from .llm_utils import (CachedModelLoader, LlmArgs, LlmBuildStats, ModelLoader,
_ModelRuntimeContext)
from .mpi_session import (MpiCommSession, MpiPoolSession, MpiSession,
external_mpi_comm_available)
from .tokenizer import TokenizerBase
# TODO[chunweiy]: move the following symbols back to utils scope, and remove the following import
from .utils import SamplingParams, exception_handler, get_device_count
class RequestOutput(GenerationResult):
"""The output data of a completion request to the LLM.
Fields:
request_id (int): The unique ID of the request.
prompt (str): The prompt string of the request.
prompt_token_ids (List[int]): The token ids of the prompt.
outputs (List[CompletionOutput]): The output sequences of the request.
context_logits (torch.Tensor): The logits on the prompt token ids.
finished (bool): Whether the whole request is finished.
"""
def __init__(self,
generation_result: GenerationResult,
prompt: Optional[str] = None,
tokenizer: Optional[TokenizerBase] = None) -> None:
self.__dict__.update(generation_result.__dict__)
self.prompt = prompt
self.tokenizer = tokenizer
def handle_generation_msg(self, tensors: tuple, error: str):
super().handle_generation_msg(tensors, error)
if self.tokenizer is not None:
for beam_output in self.outputs:
beam_output.text = self.tokenizer.decode(beam_output.token_ids)
def _repr_fields(self):
return [
'request_id', 'prompt', 'prompt_token_ids', 'outputs', 'finished'
]
class LLM:
def __init__(self,
model: str,
tokenizer: Optional[Union[str, Path, TokenizerBase,
PreTrainedTokenizerBase]] = None,
skip_tokenizer_init: bool = False,
tensor_parallel_size: int = 1,
dtype: str = "auto",
revision: Optional[str] = None,
tokenizer_revision: Optional[str] = None,
**kwargs: Any):
'''
Args:
model(str): The model name or a local path to the model directory. It could be a HuggingFace(HF) model name,
a local path to the HF model, or a local path to the TRT-LLM engine or checkpoint.
tokenizer(Optional[Union[str, Path, TokenizerBase, PreTrainedTokenizerBase]]): The tokenizer name or a local
path to the tokenizer directory.
skip_tokenizer_init: If true, skip initialization of tokenizer and detokenizer. generate and generate_async
will accept prompt token ids as input only.
tensor_parallel_size(int): The number of processes for tensor parallelism.
dtype(str): The data type for the model weights and activations.
revision(Optional[str]): The revision of the model.
tokenzier_revision(Optional[str]): The revision of the tokenizer.
kwargs: Contains the optional arguments for expert users, please refer to `llm_utils.LlmArgs` for more details.
'''
try:
self.args = LlmArgs.from_kwargs(
model=model,
tokenizer=tokenizer,
skip_tokenizer_init=skip_tokenizer_init,
tensor_parallel_size=tensor_parallel_size,
dtype=dtype,
revision=revision,
tokenizer_revision=tokenizer_revision,
**kwargs)
except Exception as e:
logger.error(
f"Failed to parse the arguments for the LLM constructor: {e}")
raise e
self.mpi_session: Optional[MpiSession] = None
if self.args.parallel_config.is_multi_gpu:
if get_device_count() < self.args.parallel_config.world_size:
raise RuntimeError(
f"Only {get_device_count()} GPUs are available, but {self.args.parallel_config.world_size} are required."
)
logger.info(
f'start MpiSession with {self.args.parallel_config.world_size} workers'
)
if not external_mpi_comm_available(
self.args.parallel_config.world_size):
self.mpi_session = MpiPoolSession(
n_workers=self.args.parallel_config.world_size)
else:
self.mpi_session = MpiCommSession(
n_workers=self.args.parallel_config.world_size)
# Due to the Executor can only accept a engine path, we need to save the engine to a directory
self._engine_dir: Optional[Path] = None
self._executor: Optional[GenerationExecutor] = None
self._workspace = tempfile.TemporaryDirectory("llm-workspace")
self.runtime_context: Optional[_ModelRuntimeContext] = None
self.llm_build_stats = LlmBuildStats()
self._build_model()
exception_handler.register(self)
@property
def workspace(self) -> Path:
return Path(self._workspace.name)
def generate(
self,
prompts: Union[str, Iterable[str], List[int], Iterable[List[int]]],
sampling_params: Optional[Union[SamplingParams,
List[SamplingParams]]] = None
) -> Union[RequestOutput, List[RequestOutput]]:
''' Generate output for the given prompts in the synchronous mode.
Synchronous generation accepts either single prompt or batched prompts.
Args:
prompts: The prompt text or token ids; could be either single prompt or batched prompts.
sampling_params: The sampling params for the generation, a default one will be used if not provided.
'''
if isinstance(prompts, str) or isinstance(prompts[0], str):
unbatched = isinstance(prompts, str)
else:
unbatched = isinstance(prompts[0], int)
if unbatched:
prompts = [prompts]
futures = []
for i, prompt in enumerate(prompts):
if isinstance(sampling_params, list):
sp = sampling_params[i]
else:
sp = sampling_params
future = self.generate_async(prompt,
sampling_params=sp,
streaming=False)
futures.append(future)
for future in futures:
future.result()
if unbatched:
futures = futures[0]
return futures
def generate_async(
self,
prompt: Union[str, List[int]],
sampling_params: Optional[SamplingParams] = None,
streaming: bool = False,
) -> RequestOutput:
''' Generate output for the given prompt in the asynchronous mode.
Asynchronous generation accepts single prompt only.
Args:
prompt: The prompt text or token ids; must be single prompt.
sampling_params: The sampling params for the generation, a default one will be used if not provided.
streaming: Whether to use the streaming mode for the generation.
'''
if isinstance(prompt, str):
prompt_token_ids = self._prepare_prompt_token_ids(prompt)
elif isinstance(prompt, list) and isinstance(prompt[0], int):
prompt_token_ids = prompt
prompt = None
else:
raise TypeError(
f"The prompt must be type str or list of int, but got {type(prompt)}"
)
sampling_params = self._prepare_sampling_params(sampling_params)
self._check_arguments(prompt_token_ids, sampling_params)
result = self._executor.generate_async(
prompt_token_ids,
sampling_params=sampling_params,
streaming=streaming,
)
return RequestOutput(result, prompt, self.tokenizer)
def _prepare_prompt_token_ids(self, prompt: str) -> List[int]:
if self.tokenizer is None:
raise ValueError("tokenizer is required to tokenize string prompt")
return self.tokenizer.encode(prompt)
def _prepare_sampling_params(
self,
sampling_params: Optional[SamplingParams] = None) -> SamplingParams:
if sampling_params is None:
if self.tokenizer is None:
raise ValueError(
"tokenizer is required to initialize a default sampling_params, or you can explicitly specify a sampling_params"
)
return SamplingParams(end_id=self.tokenizer.eos_token_id,
pad_id=self.tokenizer.pad_token_id)
elif isinstance(sampling_params, SamplingParams):
if sampling_params.end_id is None:
if self.tokenizer is None:
raise ValueError(
"tokenizer is required to reset end_id if it is None, or you can explicitly specify the end_id for sampling_params"
)
return sampling_params.setup(self.tokenizer)
else:
raise TypeError(
f"The sampling_params must be type SamplingParams or None, but got {type(sampling_params)}"
)
def _check_arguments(self, prompt_token_ids: List[int],
sampling_params: SamplingParams) -> None:
build_config = self.args.build_config
prompt_len = len(prompt_token_ids)
if prompt_len + sampling_params.max_new_tokens > build_config.max_seq_len:
raise ValueError(
f"The sum of prompt length ({prompt_len}) and max_new_tokens ({sampling_params.max_new_tokens}) should not exceed "
f"max_seq_len ({build_config.max_seq_len})")
if sampling_params.beam_width > build_config.max_beam_width:
raise ValueError(
f"sampling_params's beam_width ({sampling_params.beam_width}) should not exceed max_beam_width ({build_config.max_beam_width})"
)
def _build_model(self):
model_loader = CachedModelLoader(self.args,
mpi_session=self.mpi_session,
workspace=self.workspace,
llm_build_stats=self.llm_build_stats)
self._engine_dir = model_loader()
executor_config = tllm.ExecutorConfig(
max_beam_width=self.args.build_config.max_beam_width,
scheduler_config=self.args.scheduler_config,
batching_type=tllm.BatchingType.INFLIGHT)
if self.args.kv_cache_config is not None:
executor_config.kv_cache_config = self.args.kv_cache_config
if self.args.peft_cache_config is not None:
executor_config.peft_cache_config = self.args.peft_cache_config
if self.args.decoding_config is not None:
executor_config.decoding_config = self.args.decoding_config
if self.args.logits_post_processor_map:
executor_config.logits_post_processor_map = self.args.logits_post_processor_map
executor_config.normalize_log_probs = self.args.normalize_log_probs
executor_config.enable_chunked_context = self.args.enable_chunked_context
executor_config.max_beam_width = self.args.build_config.max_beam_width
self._executor = GenerationExecutor.create(
self._engine_dir,
executor_config=executor_config,
model_world_size=self.args.parallel_config.world_size,
mpi_session=self.mpi_session,
reuse_mpi_comm=external_mpi_comm_available(
self.args.parallel_config.world_size))
@property
def tokenizer(self) -> TokenizerBase:
if self.args.skip_tokenizer_init:
return None
if self.args.tokenizer is not None:
return self.args.tokenizer
if self.runtime_context is not None:
return self.runtime_context.tokenizer
try:
self.args.tokenizer = ModelLoader.load_hf_tokenizer(
self.args.model_dir)
except:
pass
return self.args.tokenizer
def save(self, engine_dir: str):
''' Save the built engine to the given path. '''
logger.info(f"Save model to {engine_dir}")
if self._engine_dir is None:
raise RuntimeError("The engine is not built yet.")
if self._engine_dir.absolute() != os.path.abspath(engine_dir):
shutil.copytree(self._engine_dir, engine_dir, dirs_exist_ok=True)
def shutdown(self):
if hasattr(self, "_executor") and self._executor is not None:
self._executor.shutdown()
if self.mpi_session is not None:
self.mpi_session.shutdown()
self.mpi_session = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback) -> bool:
del exc_value, traceback
self.shutdown()
return exc_type is not None
def __getstate__(self):
raise RuntimeError("LLM object can not be pickled.")
def __del__(self):
self.shutdown()
|