from vllm import LLM, SamplingParams from typing import List, Dict, Any, Optional import logging import asyncio from concurrent.futures import ThreadPoolExecutor logger = logging.getLogger(__name__) class VLLMServer: def __init__(self, model_name: str = "openai/gpt-oss-20b", tensor_parallel_size: int = 1, gpu_memory_utilization: float = 0.9): self.model_name = model_name self.tensor_parallel_size = tensor_parallel_size self.gpu_memory_utilization = gpu_memory_utilization self.llm = None self.executor = ThreadPoolExecutor(max_workers=4) def initialize(self): """Initialize the vLLM model""" try: self.llm = LLM( model=self.model_name, tensor_parallel_size=self.tensor_parallel_size, gpu_memory_utilization=self.gpu_memory_utilization, trust_remote_code=True ) logger.info(f"Initialized vLLM with model: {self.model_name}") except Exception as e: logger.error(f"Failed to initialize vLLM: {e}") raise def generate(self, prompts: List[str], max_tokens: int = 512, temperature: float = 0.7, top_p: float = 0.9, stop: Optional[List[str]] = None) -> List[Dict[str, Any]]: """Generate text for prompts""" if self.llm is None: self.initialize() sampling_params = SamplingParams( max_tokens=max_tokens, temperature=temperature, top_p=top_p, stop=stop ) try: outputs = self.llm.generate(prompts, sampling_params) results = [] for output in outputs: results.append({ 'text': output.outputs[0].text, 'prompt': output.prompt, 'finish_reason': output.outputs[0].finish_reason, 'token_ids': output.outputs[0].token_ids, 'logprobs': getattr(output.outputs[0], 'logprobs', None) }) return results except Exception as e: logger.error(f"Generation failed: {e}") raise def generate_single(self, prompt: str, **kwargs) -> str: """Generate text for a single prompt""" results = self.generate([prompt], **kwargs) return results[0]['text'] if results else "" def generate_batch(self, prompts: List[str], batch_size: int = 8, **kwargs) -> List[str]: """Generate text for multiple prompts in batches""" all_results = [] for i in range(0, len(prompts), batch_size): batch_prompts = prompts[i:i + batch_size] batch_results = self.generate(batch_prompts, **kwargs) all_results.extend([r['text'] for r in batch_results]) return all_results async def generate_async(self, prompts: List[str], **kwargs) -> List[Dict[str, Any]]: """Async generation""" loop = asyncio.get_event_loop() return await loop.run_in_executor(self.executor, self.generate, prompts, **kwargs) def get_model_info(self) -> Dict[str, Any]: """Get model information""" if self.llm is None: return {} return { 'model_name': self.model_name, 'tensor_parallel_size': self.tensor_parallel_size, 'gpu_memory_utilization': self.gpu_memory_utilization, 'is_initialized': self.llm is not None } def cleanup(self): """Cleanup resources""" if self.executor: self.executor.shutdown(wait=True)