Spaces:
Runtime error
Runtime error
| """ | |
| vLLM integration for ultra-fast LLM inference with PagedAttention. | |
| Achieves 10-100x throughput compared to standard HuggingFace. | |
| """ | |
| import time | |
| import torch | |
| from typing import List, Dict, Any, Optional, Generator | |
| from pathlib import Path | |
| import json | |
| import logging | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| # Try to import vLLM, fallback to standard transformers | |
| try: | |
| from vllm import LLM, SamplingParams | |
| from vllm.outputs import RequestOutput | |
| VLLM_AVAILABLE = True | |
| except ImportError: | |
| VLLM_AVAILABLE = False | |
| logging.warning("vLLM not available, falling back to standard transformers") | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModelForCausalLM, | |
| pipeline, | |
| TextStreamer, | |
| GenerationConfig | |
| ) | |
| from app.hyper_config import config | |
| logger = logging.getLogger(__name__) | |
| class InferenceEngine(str, Enum): | |
| VLLM = "vllm" # Ultra-fast with PagedAttention | |
| TRANSFORMERS = "transformers" # Standard HuggingFace | |
| ONNX = "onnx" # ONNX Runtime | |
| TENSORRT = "tensorrt" # NVIDIA TensorRT | |
| class GenerationResult: | |
| text: str | |
| tokens: List[str] | |
| generation_time_ms: float | |
| tokens_per_second: float | |
| prompt_tokens: int | |
| generated_tokens: int | |
| finish_reason: str | |
| engine: InferenceEngine | |
| class UltraFastLLM: | |
| """ | |
| Ultra-fast LLM inference with multiple engine support. | |
| Features: | |
| - vLLM with PagedAttention (10-100x throughput) | |
| - Continuous batching for high concurrency | |
| - Quantization support (GPTQ, AWQ, GGUF) | |
| - Streaming responses | |
| - Adaptive engine selection | |
| """ | |
| def __init__( | |
| self, | |
| model_name: str = None, | |
| engine: InferenceEngine = None, | |
| quantization: str = None, | |
| max_model_len: int = 4096, | |
| gpu_memory_utilization: float = 0.9 | |
| ): | |
| self.model_name = model_name or config.llm_model | |
| self.engine = engine or InferenceEngine.VLLM if VLLM_AVAILABLE else InferenceEngine.TRANSFORMERS | |
| self.quantization = quantization or config.llm_quantization.value | |
| self.max_model_len = max_model_len | |
| self.gpu_memory_utilization = gpu_memory_utilization | |
| self.llm = None | |
| self.tokenizer = None | |
| self.pipeline = None | |
| self._initialized = False | |
| # Performance tracking | |
| self.total_requests = 0 | |
| self.total_tokens = 0 | |
| self.total_time_ms = 0.0 | |
| # Engine-specific configurations | |
| self.engine_configs = { | |
| InferenceEngine.VLLM: { | |
| "tensor_parallel_size": 1, | |
| "pipeline_parallel_size": 1, | |
| "enable_prefix_caching": True, | |
| "block_size": 16, | |
| "swap_space": 4, # GB | |
| "max_num_seqs": 256, | |
| }, | |
| InferenceEngine.TRANSFORMERS: { | |
| "device_map": "auto", | |
| "low_cpu_mem_usage": True, | |
| "torch_dtype": torch.float16 if torch.cuda.is_available() else torch.float32, | |
| }, | |
| InferenceEngine.ONNX: { | |
| "provider": "CPUExecutionProvider", | |
| "session_options": { | |
| "intra_op_num_threads": 4, | |
| "inter_op_num_threads": 2, | |
| } | |
| } | |
| } | |
| logger.info(f"🚀 Initializing UltraFastLLM with engine: {self.engine.value}") | |
| def initialize(self): | |
| """Initialize the LLM engine.""" | |
| if self._initialized: | |
| return | |
| logger.info(f"Loading model: {self.model_name}") | |
| logger.info(f"Engine: {self.engine.value}") | |
| logger.info(f"Quantization: {self.quantization}") | |
| start_time = time.perf_counter() | |
| try: | |
| if self.engine == InferenceEngine.VLLM and VLLM_AVAILABLE: | |
| self._initialize_vllm() | |
| elif self.engine == InferenceEngine.TRANSFORMERS: | |
| self._initialize_transformers() | |
| elif self.engine == InferenceEngine.ONNX: | |
| self._initialize_onnx() | |
| else: | |
| raise ValueError(f"Unsupported engine: {self.engine}") | |
| init_time = (time.perf_counter() - start_time) * 1000 | |
| logger.info(f"✅ LLM initialized in {init_time:.1f}ms") | |
| # Warm up | |
| self._warm_up() | |
| self._initialized = True | |
| except Exception as e: | |
| logger.error(f"❌ Failed to initialize LLM: {e}") | |
| # Fallback to transformers | |
| if self.engine != InferenceEngine.TRANSFORMERS: | |
| logger.warning("Falling back to transformers engine") | |
| self.engine = InferenceEngine.TRANSFORMERS | |
| self.initialize() | |
| else: | |
| raise | |
| def _initialize_vllm(self): | |
| """Initialize vLLM engine.""" | |
| from vllm import LLM | |
| logger.info("Initializing vLLM engine...") | |
| # Configure quantization | |
| quantization_config = None | |
| if self.quantization == "gptq": | |
| from vllm import GPTQConfig | |
| quantization_config = GPTQConfig(bits=4, group_size=128) | |
| elif self.quantization == "awq": | |
| from vllm import AWQConfig | |
| quantization_config = AWQConfig(bits=4, group_size=128) | |
| # Create LLM instance | |
| self.llm = LLM( | |
| model=self.model_name, | |
| tokenizer=self.model_name, | |
| max_model_len=self.max_model_len, | |
| gpu_memory_utilization=self.gpu_memory_utilization, | |
| quantization_config=quantization_config, | |
| **self.engine_configs[InferenceEngine.VLLM] | |
| ) | |
| self.tokenizer = self.llm.get_tokenizer() | |
| logger.info(f"vLLM initialized with {self.llm.llm_engine.model_config.get_sliding_window()} sliding window") | |
| def _initialize_transformers(self): | |
| """Initialize standard transformers.""" | |
| logger.info("Initializing transformers engine...") | |
| # Load tokenizer | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| self.model_name, | |
| trust_remote_code=True | |
| ) | |
| if self.tokenizer.pad_token is None: | |
| self.tokenizer.pad_token = self.tokenizer.eos_token | |
| # Load model with optimizations | |
| model_kwargs = self.engine_configs[InferenceEngine.TRANSFORMERS].copy() | |
| # Add quantization if specified | |
| if self.quantization in ["int8", "int4"]: | |
| from transformers import BitsAndBytesConfig | |
| bnb_config = BitsAndBytesConfig( | |
| load_in_4bit=self.quantization == "int4", | |
| load_in_8bit=self.quantization == "int8", | |
| bnb_4bit_compute_dtype=torch.float16, | |
| bnb_4bit_use_double_quant=True, | |
| bnb_4bit_quant_type="nf4" | |
| ) | |
| model_kwargs["quantization_config"] = bnb_config | |
| # Load model | |
| model = AutoModelForCausalLM.from_pretrained( | |
| self.model_name, | |
| **model_kwargs, | |
| trust_remote_code=True | |
| ) | |
| # Create pipeline | |
| self.pipeline = pipeline( | |
| "text-generation", | |
| model=model, | |
| tokenizer=self.tokenizer, | |
| device_map="auto" if torch.cuda.is_available() else None, | |
| ) | |
| logger.info("Transformers pipeline initialized") | |
| def _initialize_onnx(self): | |
| """Initialize ONNX Runtime engine.""" | |
| # This would require ONNX model conversion | |
| # For now, fallback to transformers | |
| logger.warning("ONNX engine not fully implemented, falling back to transformers") | |
| self.engine = InferenceEngine.TRANSFORMERS | |
| self._initialize_transformers() | |
| def _warm_up(self): | |
| """Warm up the model with sample prompts.""" | |
| warmup_prompts = [ | |
| "Hello, how are you?", | |
| "What is artificial intelligence?", | |
| "Explain machine learning in simple terms." | |
| ] | |
| logger.info("Warming up LLM...") | |
| for prompt in warmup_prompts: | |
| _ = self.generate(prompt, max_tokens=10) | |
| logger.info("✅ LLM warm-up complete") | |
| def generate( | |
| self, | |
| prompt: str, | |
| system_prompt: Optional[str] = None, | |
| max_tokens: int = 1024, | |
| temperature: float = 0.7, | |
| top_p: float = 0.95, | |
| stream: bool = False, | |
| **kwargs | |
| ) -> GenerationResult: | |
| """ | |
| Generate text from prompt. | |
| Args: | |
| prompt: The input prompt | |
| system_prompt: Optional system prompt | |
| max_tokens: Maximum tokens to generate | |
| temperature: Sampling temperature | |
| top_p: Top-p sampling parameter | |
| stream: Whether to stream the response | |
| **kwargs: Additional generation parameters | |
| Returns: | |
| GenerationResult with generated text and metadata | |
| """ | |
| if not self._initialized: | |
| self.initialize() | |
| # Format prompt with system message if provided | |
| if system_prompt: | |
| full_prompt = f"{system_prompt}\n\n{prompt}" | |
| else: | |
| full_prompt = prompt | |
| start_time = time.perf_counter() | |
| try: | |
| if self.engine == InferenceEngine.VLLM and self.llm: | |
| result = self._generate_vllm( | |
| full_prompt, max_tokens, temperature, top_p, stream, **kwargs | |
| ) | |
| else: | |
| result = self._generate_transformers( | |
| full_prompt, max_tokens, temperature, top_p, stream, **kwargs | |
| ) | |
| # Update performance stats | |
| self.total_requests += 1 | |
| self.total_tokens += result.generated_tokens | |
| self.total_time_ms += result.generation_time_ms | |
| logger.debug(f"Generated {result.generated_tokens} tokens in " | |
| f"{result.generation_time_ms:.1f}ms " | |
| f"({result.tokens_per_second:.1f} tokens/sec)") | |
| return result | |
| except Exception as e: | |
| logger.error(f"Generation failed: {e}") | |
| raise | |
| def _generate_vllm( | |
| self, | |
| prompt: str, | |
| max_tokens: int, | |
| temperature: float, | |
| top_p: float, | |
| stream: bool, | |
| **kwargs | |
| ) -> GenerationResult: | |
| """Generate using vLLM engine.""" | |
| sampling_params = SamplingParams( | |
| max_tokens=max_tokens, | |
| temperature=temperature, | |
| top_p=top_p, | |
| **kwargs | |
| ) | |
| if stream: | |
| # Streaming generation | |
| outputs = self.llm.generate([prompt], sampling_params, stream=True) | |
| generated_text = "" | |
| for output in outputs: | |
| generated_text = output.outputs[0].text | |
| # For streaming, we need to calculate time differently | |
| generation_time = (time.perf_counter() - start_time) * 1000 | |
| # This is simplified - in reality would track during streaming | |
| else: | |
| # Non-streaming generation | |
| start_time = time.perf_counter() | |
| outputs = self.llm.generate([prompt], sampling_params) | |
| generation_time = (time.perf_counter() - start_time) * 1000 | |
| output = outputs[0] | |
| generated_text = output.outputs[0].text | |
| generated_tokens = len(output.outputs[0].token_ids) | |
| prompt_tokens = len(output.prompt_token_ids) | |
| finish_reason = output.outputs[0].finish_reason | |
| tokens_per_second = generated_tokens / (generation_time / 1000) if generation_time > 0 else 0 | |
| return GenerationResult( | |
| text=generated_text, | |
| tokens=[], # vLLM doesn't easily expose tokens | |
| generation_time_ms=generation_time, | |
| tokens_per_second=tokens_per_second, | |
| prompt_tokens=prompt_tokens, | |
| generated_tokens=generated_tokens, | |
| finish_reason=finish_reason, | |
| engine=InferenceEngine.VLLM | |
| ) | |
| def _generate_transformers( | |
| self, | |
| prompt: str, | |
| max_tokens: int, | |
| temperature: float, | |
| top_p: float, | |
| stream: bool, | |
| **kwargs | |
| ) -> GenerationResult: | |
| """Generate using transformers engine.""" | |
| start_time = time.perf_counter() | |
| generation_config = GenerationConfig( | |
| max_new_tokens=max_tokens, | |
| temperature=temperature, | |
| top_p=top_p, | |
| do_sample=True, | |
| **kwargs | |
| ) | |
| if stream and hasattr(self.pipeline, "__call__"): | |
| # Streaming generation | |
| outputs = self.pipeline( | |
| prompt, | |
| generation_config=generation_config, | |
| streamer=TextStreamer(self.tokenizer, skip_prompt=True), | |
| return_full_text=False, | |
| **kwargs | |
| ) | |
| generated_text = outputs[0]['generated_text'] | |
| else: | |
| # Non-streaming generation | |
| outputs = self.pipeline( | |
| prompt, | |
| generation_config=generation_config, | |
| max_new_tokens=max_tokens, | |
| temperature=temperature, | |
| top_p=top_p, | |
| do_sample=True, | |
| return_full_text=False, | |
| **kwargs | |
| ) | |
| generated_text = outputs[0]['generated_text'] | |
| generation_time = (time.perf_counter() - start_time) * 1000 | |
| # Token counting | |
| prompt_tokens = len(self.tokenizer.encode(prompt)) | |
| generated_tokens = len(self.tokenizer.encode(generated_text)) | |
| tokens_per_second = generated_tokens / (generation_time / 1000) if generation_time > 0 else 0 | |
| return GenerationResult( | |
| text=generated_text, | |
| tokens=self.tokenizer.tokenize(generated_text), | |
| generation_time_ms=generation_time, | |
| tokens_per_second=tokens_per_second, | |
| prompt_tokens=prompt_tokens, | |
| generated_tokens=generated_tokens, | |
| finish_reason="length", # Simplified | |
| engine=InferenceEngine.TRANSFORMERS | |
| ) | |
| def generate_batch( | |
| self, | |
| prompts: List[str], | |
| **kwargs | |
| ) -> List[GenerationResult]: | |
| """Generate responses for multiple prompts in batch.""" | |
| if not self._initialized: | |
| self.initialize() | |
| start_time = time.perf_counter() | |
| if self.engine == InferenceEngine.VLLM and self.llm: | |
| # vLLM batch generation | |
| sampling_params = SamplingParams( | |
| max_tokens=kwargs.get('max_tokens', 1024), | |
| temperature=kwargs.get('temperature', 0.7), | |
| top_p=kwargs.get('top_p', 0.95) | |
| ) | |
| outputs = self.llm.generate(prompts, sampling_params) | |
| results = [] | |
| for output in outputs: | |
| generated_text = output.outputs[0].text | |
| generated_tokens = len(output.outputs[0].token_ids) | |
| prompt_tokens = len(output.prompt_token_ids) | |
| # Calculate individual time (approximate) | |
| generation_time = (time.perf_counter() - start_time) * 1000 / len(prompts) | |
| tokens_per_second = generated_tokens / (generation_time / 1000) if generation_time > 0 else 0 | |
| results.append(GenerationResult( | |
| text=generated_text, | |
| tokens=[], | |
| generation_time_ms=generation_time, | |
| tokens_per_second=tokens_per_second, | |
| prompt_tokens=prompt_tokens, | |
| generated_tokens=generated_tokens, | |
| finish_reason=output.outputs[0].finish_reason, | |
| engine=InferenceEngine.VLLM | |
| )) | |
| return results | |
| else: | |
| # Transformers batch generation (sequential for simplicity) | |
| results = [] | |
| for prompt in prompts: | |
| result = self.generate(prompt, **kwargs) | |
| results.append(result) | |
| return results | |
| def get_performance_stats(self) -> Dict[str, Any]: | |
| """Get performance statistics.""" | |
| avg_time = self.total_time_ms / self.total_requests if self.total_requests > 0 else 0 | |
| avg_tokens_per_second = self.total_tokens / (self.total_time_ms / 1000) if self.total_time_ms > 0 else 0 | |
| return { | |
| "total_requests": self.total_requests, | |
| "total_tokens": self.total_tokens, | |
| "total_time_ms": self.total_time_ms, | |
| "avg_time_per_request_ms": avg_time, | |
| "avg_tokens_per_second": avg_tokens_per_second, | |
| "engine": self.engine.value, | |
| "model": self.model_name, | |
| "quantization": self.quantization | |
| } | |
| def __del__(self): | |
| """Cleanup.""" | |
| if self.llm: | |
| del self.llm | |
| # Global LLM instance | |
| _llm_instance = None | |
| def get_llm() -> UltraFastLLM: | |
| """Get or create the global LLM instance.""" | |
| global _llm_instance | |
| if _llm_instance is None: | |
| _llm_instance = UltraFastLLM() | |
| _llm_instance.initialize() | |
| return _llm_instance | |
| # Test function | |
| if __name__ == "__main__": | |
| import logging | |
| logging.basicConfig(level=logging.INFO) | |
| print("\n🧪 Testing UltraFastLLM...") | |
| llm = UltraFastLLM( | |
| model_name="Qwen/Qwen2.5-0.5B-Instruct", | |
| engine=InferenceEngine.TRANSFORMERS # Use transformers for testing | |
| ) | |
| llm.initialize() | |
| # Test single generation | |
| prompt = "What is machine learning in simple terms?" | |
| print(f"\n📝 Prompt: {prompt}") | |
| result = llm.generate(prompt, max_tokens=100, temperature=0.7) | |
| print(f"\n🤖 Response: {result.text}") | |
| print(f"\n📊 Metrics:") | |
| print(f" Generation time: {result.generation_time_ms:.1f}ms") | |
| print(f" Tokens generated: {result.generated_tokens}") | |
| print(f" Tokens/sec: {result.tokens_per_second:.1f}") | |
| print(f" Engine: {result.engine.value}") | |
| # Test batch generation | |
| print("\n🧪 Testing batch generation...") | |
| prompts = [ | |
| "Explain artificial intelligence", | |
| "What is deep learning?", | |
| "Describe natural language processing" | |
| ] | |
| results = llm.generate_batch(prompts, max_tokens=50) | |
| for i, (prompt, result) in enumerate(zip(prompts, results)): | |
| print(f"\n {i+1}. {prompt[:30]}...") | |
| print(f" Response: {result.text[:50]}...") | |
| print(f" Time: {result.generation_time_ms:.1f}ms") | |
| # Performance stats | |
| stats = llm.get_performance_stats() | |
| print("\n📈 Overall Performance Statistics:") | |
| for key, value in stats.items(): | |
| print(f" {key}: {value}") | |