| from transformers import Qwen2VLForConditionalGeneration, AutoTokenizer, AutoProcessor, Qwen2_5_VLForConditionalGeneration |
| from qwen_vl_utils import process_vision_info |
| from transformers import TextIteratorStreamer |
| from threading import Thread |
| import logging |
| import torch |
| import time |
| import pynvml |
|
|
| class Qwen2VL: |
| def __init__(self, model_id): |
| self.model_id = model_id |
| if "2.5" in model_id: |
| self.model = Qwen2_5_VLForConditionalGeneration.from_pretrained( |
| model_id, torch_dtype="float16", device_map="auto" |
| ) |
| else: |
| self.model = Qwen2VLForConditionalGeneration.from_pretrained( |
| model_id, torch_dtype="float16", device_map="auto" |
| ) |
| self.processor = AutoProcessor.from_pretrained(model_id) |
|
|
| self.handle = None |
| if torch.cuda.is_available(): |
| try: |
| pynvml.nvmlInit() |
| device_id = next(self.model.parameters()).device.index |
| self.handle = pynvml.nvmlDeviceGetHandleByIndex(device_id) |
| except Exception as e: |
| logging.error(f"Failed to initialize NVML: {e}") |
|
|
| def __del__(self): |
| if hasattr(self, 'handle') and self.handle: |
| try: |
| pynvml.nvmlShutdown() |
| except: |
| pass |
|
|
| def generate(self, video, prompt): |
|
|
| start_time = time.time() |
|
|
| |
| video_paths = [f"file://{path}" for path in video] |
| messages = [ |
| { |
| "role": "user", |
| "content": [ |
| { |
| "type": "video", |
| "video": video_paths, |
| "resized_height": 280, |
| "resized_width": 420, |
| }, |
| {"type": "text", "text": prompt}, |
| ], |
| } |
| ] |
| text = self.processor.apply_chat_template( |
| messages, tokenize=False, add_generation_prompt=True |
| ) |
|
|
| image_inputs, video_inputs = process_vision_info(messages) |
| inputs = self.processor( |
| text=[text], |
| images=image_inputs, |
| videos=video_inputs, |
| padding=True, |
| return_tensors="pt", |
| ) |
| inputs = inputs.to("cuda") |
| logging.info(f"Prompt token length: {len(inputs.input_ids[0])}") |
| streamer = TextIteratorStreamer(self.processor, skip_prompt=True, skip_special_tokens=True) |
|
|
| generation_kwargs = dict( |
| **inputs, |
| streamer=streamer, |
| max_new_tokens=256 |
| ) |
|
|
| thread = Thread(target=self.model.generate, kwargs=generation_kwargs) |
| thread.start() |
| full_response = "" |
| print("Response: ", end="") |
| first_token_time = None |
| for new_text in streamer: |
| if first_token_time is None: |
| first_token_time = time.time() |
| full_response += new_text |
| print(new_text, end="", flush=True) |
| print() |
| thread.join() |
|
|
| end_time = time.time() |
| |
| if first_token_time is not None: |
| generation_time = end_time - first_token_time |
| else: |
| generation_time = 0 |
| |
| num_generated_tokens = len(self.processor.tokenizer(full_response).input_ids) |
| tokens_per_second = num_generated_tokens / generation_time if generation_time > 0 else 0 |
|
|
| peak_memory_mb = 0 |
| if self.handle: |
| mem_info = pynvml.nvmlDeviceGetMemoryInfo(self.handle) |
| peak_memory_mb = mem_info.used / (1024 * 1024) |
|
|
| return { |
| "response": full_response, |
| "tokens_per_second": tokens_per_second, |
| "peak_gpu_memory_mb": peak_memory_mb, |
| "num_generated_tokens": num_generated_tokens, |
| } |
| |
|
|