| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | import asyncio |
| | import os |
| | import platform |
| | from collections.abc import AsyncGenerator |
| | from threading import Thread |
| | from typing import TYPE_CHECKING, Any, Optional |
| |
|
| | import torch |
| | from typing_extensions import override |
| |
|
| | from ..data import get_template_and_fix_tokenizer |
| | from ..extras import logging |
| | from ..extras.constants import EngineName |
| | from ..model import load_model, load_tokenizer |
| | from .base_engine import BaseEngine, Response |
| |
|
| |
|
| | if TYPE_CHECKING: |
| | from transformers import PreTrainedTokenizer |
| | from trl import PreTrainedModelWrapper |
| |
|
| | from ..data.mm_plugin import AudioInput, ImageInput, VideoInput |
| | from ..hparams import DataArguments, FinetuningArguments, GeneratingArguments, ModelArguments |
| |
|
| | from ktransformers.operators.flashinfer_wrapper import flashinfer_enabled |
| | from ktransformers.server.config.config import Config |
| | from ktransformers.util.utils import ( |
| | get_compute_capability, |
| | prefill_and_generate_capture, |
| | ) |
| | from ktransformers.util.vendors import GPUVendor, device_manager |
| |
|
| |
|
| | logger = logging.get_logger(__name__) |
| |
|
| |
|
| | class KTransformersEngine(BaseEngine): |
| | def __init__( |
| | self, |
| | model_args: "ModelArguments", |
| | data_args: "DataArguments", |
| | finetuning_args: "FinetuningArguments", |
| | generating_args: "GeneratingArguments", |
| | ) -> None: |
| | self.name = EngineName.KT |
| | self.can_generate = finetuning_args.stage == "sft" |
| |
|
| | tok_mod = load_tokenizer(model_args) |
| | self.tokenizer = tok_mod["tokenizer"] |
| | self.tokenizer.padding_side = "left" if self.can_generate else "right" |
| | self.template = get_template_and_fix_tokenizer(self.tokenizer, data_args) |
| |
|
| | self.model = load_model( |
| | self.tokenizer, model_args, finetuning_args, is_trainable=False, add_valuehead=(not self.can_generate) |
| | ) |
| |
|
| | self.generating_args = generating_args.to_dict() |
| | self.max_new_tokens = model_args.kt_maxlen |
| | self.use_cuda_graph = model_args.kt_use_cuda_graph |
| | self.mode = model_args.kt_mode |
| | self.force_think = model_args.kt_force_think |
| | self.chunk_size = model_args.chunk_size |
| |
|
| | try: |
| | asyncio.get_event_loop() |
| | except RuntimeError: |
| | loop = asyncio.new_event_loop() |
| | asyncio.set_event_loop(loop) |
| |
|
| | self.semaphore = asyncio.Semaphore(int(os.getenv("MAX_CONCURRENT", "1"))) |
| |
|
| | @staticmethod |
| | @torch.inference_mode() |
| | def _get_scores( |
| | model: "PreTrainedModelWrapper", |
| | tokenizer: "PreTrainedTokenizer", |
| | batch_input: list[str], |
| | input_kwargs: Optional[dict[str, Any]] = {}, |
| | ) -> list[float]: |
| | max_length: Optional[int] = input_kwargs.pop("max_length", None) |
| | device = getattr(model.pretrained_model, "device", "cuda") |
| | inputs = tokenizer( |
| | batch_input, |
| | padding=True, |
| | truncation=True, |
| | max_length=max_length or getattr(model.config, "max_position_embeddings", 1024), |
| | return_tensors="pt", |
| | add_special_tokens=False, |
| | ).to(device) |
| | values: torch.Tensor = model(**inputs, return_dict=True, use_cache=False)[-1] |
| | scores = values.gather(dim=-1, index=(inputs["attention_mask"].sum(dim=-1, keepdim=True) - 1)) |
| | return scores |
| |
|
| | async def _generate( |
| | self, |
| | messages: list[dict[str, str]], |
| | system: Optional[str] = None, |
| | tools: Optional[str] = None, |
| | **input_kwargs, |
| | ) -> AsyncGenerator[str, None]: |
| | paired = messages + [{"role": "assistant", "content": ""}] |
| | prompt_ids, _ = self.template.encode_oneturn(self.tokenizer, paired, system, tools) |
| | prompt_len = len(prompt_ids) |
| |
|
| | max_length: Optional[int] = input_kwargs.pop("max_length", None) |
| | max_new_tokens: Optional[int] = input_kwargs.pop("max_new_tokens", None) |
| |
|
| | if "max_new_tokens" in self.generating_args: |
| | max_tokens = int(self.generating_args["max_new_tokens"]) |
| | elif "max_length" in self.generating_args: |
| | gl = int(self.generating_args["max_length"]) |
| | max_tokens = gl - prompt_len if gl > prompt_len else 1 |
| | else: |
| | max_tokens = self.max_new_tokens or 256 |
| |
|
| | if max_length is not None: |
| | max_tokens = max(max_length - prompt_len, 1) |
| | if max_new_tokens is not None: |
| | max_tokens = int(max_new_tokens) |
| | max_tokens = max(1, int(max_tokens)) |
| |
|
| | if self.mode == "long_context": |
| | max_len_cfg = Config().long_context_config["max_seq_len"] |
| | need = prompt_len + max_tokens |
| | assert max_len_cfg > need, f"please set max_seq_len > {need} in ~/.ktransformers/config.yaml" |
| |
|
| | device = next(self.model.parameters()).device |
| | input_tensor = torch.tensor([prompt_ids], dtype=torch.long, device=device) |
| | if self.force_think: |
| | think = torch.tensor( |
| | [self.tokenizer.encode("<think>\n", add_special_tokens=False)], dtype=torch.long, device=device |
| | ) |
| | input_tensor = torch.cat([input_tensor, think], dim=1) |
| |
|
| | use_flashinfer = ( |
| | platform.system() != "Windows" |
| | and getattr(self.model.config, "architectures", [""])[0] |
| | in {"DeepseekV2ForCausalLM", "DeepseekV3ForCausalLM"} |
| | and flashinfer_enabled |
| | and get_compute_capability() >= 8 |
| | and device_manager.gpu_vendor == GPUVendor.NVIDIA |
| | ) |
| |
|
| | def make_gen(): |
| | if use_flashinfer: |
| | return prefill_and_generate_capture( |
| | self.model, |
| | self.tokenizer, |
| | input_tensor, |
| | max_tokens, |
| | self.use_cuda_graph, |
| | mode=self.mode, |
| | force_think=self.force_think, |
| | chunk_size=self.chunk_size, |
| | use_flashinfer_mla=True, |
| | num_heads=self.model.config.num_attention_heads, |
| | head_dim_ckv=getattr(self.model.config, "kv_lora_rank", 0), |
| | head_dim_kpe=getattr(self.model.config, "qk_rope_head_dim", 0), |
| | q_head_dim=getattr(self.model.config, "qk_rope_head_dim", 0) |
| | + getattr(self.model.config, "qk_nope_head_dim", 0), |
| | echo_stream=False, |
| | ) |
| | else: |
| | return prefill_and_generate_capture( |
| | self.model, |
| | self.tokenizer, |
| | input_tensor, |
| | max_tokens, |
| | self.use_cuda_graph, |
| | mode=self.mode, |
| | force_think=self.force_think, |
| | chunk_size=self.chunk_size, |
| | echo_stream=False, |
| | ) |
| |
|
| | loop = asyncio.get_running_loop() |
| | q: asyncio.Queue[Optional[str]] = asyncio.Queue() |
| |
|
| | def producer(): |
| | try: |
| | gen = make_gen() |
| | if hasattr(gen, "__aiter__"): |
| |
|
| | async def drain_async(): |
| | async for t in gen: |
| | loop.call_soon_threadsafe(q.put_nowait, t if isinstance(t, str) else str(t)) |
| |
|
| | asyncio.run(drain_async()) |
| | elif hasattr(gen, "__iter__"): |
| | for t in gen: |
| | loop.call_soon_threadsafe(q.put_nowait, t if isinstance(t, str) else str(t)) |
| | else: |
| | loop.call_soon_threadsafe(q.put_nowait, gen if isinstance(gen, str) else str(gen)) |
| | finally: |
| | loop.call_soon_threadsafe(q.put_nowait, None) |
| |
|
| | Thread(target=producer, daemon=True).start() |
| |
|
| | while True: |
| | item = await q.get() |
| | if item is None: |
| | break |
| | yield item |
| |
|
| | @override |
| | async def chat( |
| | self, |
| | messages: list[dict[str, str]], |
| | system: Optional[str] = None, |
| | tools: Optional[str] = None, |
| | images: Optional[list["ImageInput"]] = None, |
| | videos: Optional[list["VideoInput"]] = None, |
| | audios: Optional[list["AudioInput"]] = None, |
| | **input_kwargs, |
| | ) -> list["Response"]: |
| | if not self.can_generate: |
| | raise ValueError("The current model does not support `chat`.") |
| | async with self.semaphore: |
| | produced = "" |
| | final_text = "" |
| | async for t in self._generate(messages, system, tools, **input_kwargs): |
| | delta = t |
| | produced = produced + delta |
| | if delta: |
| | final_text += delta |
| |
|
| | prompt_ids, _ = self.template.encode_oneturn( |
| | self.tokenizer, messages + [{"role": "assistant", "content": ""}], system, tools |
| | ) |
| | return [ |
| | Response( |
| | response_text=final_text, |
| | response_length=len(self.tokenizer.encode(final_text, add_special_tokens=False)), |
| | prompt_length=len(prompt_ids), |
| | finish_reason="stop", |
| | ) |
| | ] |
| |
|
| | @override |
| | async def stream_chat( |
| | self, |
| | messages: list[dict[str, str]], |
| | system: Optional[str] = None, |
| | tools: Optional[str] = None, |
| | images: Optional[list["ImageInput"]] = None, |
| | videos: Optional[list["VideoInput"]] = None, |
| | audios: Optional[list["AudioInput"]] = None, |
| | **input_kwargs, |
| | ) -> AsyncGenerator[str, None]: |
| | if not self.can_generate: |
| | raise ValueError("The current model does not support `stream_chat`.") |
| | async with self.semaphore: |
| | produced = "" |
| | async for t in self._generate(messages, system, tools, **input_kwargs): |
| | delta = t[len(produced) :] if t.startswith(produced) else t |
| | produced = t |
| | if delta: |
| | yield delta |
| |
|
| | @override |
| | async def get_scores( |
| | self, |
| | batch_input: list[str], |
| | **input_kwargs, |
| | ) -> list[float]: |
| | if self.can_generate: |
| | raise ValueError("Cannot get scores using an auto-regressive model.") |
| | args = (self.model, self.tokenizer, batch_input, input_kwargs) |
| | async with self.semaphore: |
| | return await asyncio.to_thread(self._get_scores, *args) |
| |
|