File size: 20,977 Bytes
b0c0df0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 |
import base64
import re
from io import BytesIO
from typing import List, Optional, Tuple, Union
import decord
import numpy as np
import torch
from accelerate import Accelerator, DistributedType
from loguru import logger as eval_logger
from PIL import Image
from tqdm import tqdm
from transformers import AutoProcessor, AutoTokenizer, Qwen2VLForConditionalGeneration
# TODO: Consider moving flatten to lmms_eval.utils
# from lmms_eval import utils
from lmms_eval.api.instance import Instance
from lmms_eval.api.model import lmms
from lmms_eval.api.registry import register_model
from lmms_eval.models.model_utils.load_video import load_video_decord
try:
from qwen_vl_utils import process_vision_info
except ImportError:
eval_logger.warning("Failed to import qwen_vl_utils; Please install it via `pip install qwen-vl-utils`")
@register_model("qwen2_vl")
class Qwen2_VL(lmms):
"""
Qwen2_VL Model
"https://github.com/QwenLM/Qwen2-VL"
"""
def __init__(
self,
pretrained: str = "Qwen/Qwen2-VL-7B-Instruct",
device: Optional[str] = "cuda",
device_map: Optional[str] = "cuda",
batch_size: Optional[Union[int, str]] = 1,
use_cache=True,
use_flash_attention_2: Optional[bool] = False,
max_length: Optional[int] = 2048, # Added max_length parameter
max_pixels: int = 602112,
min_pixels: int = 3136,
max_num_frames: int = 32,
system_prompt: Optional[str] = "You are a helpful assistant.",
interleave_visuals: Optional[bool] = False,
reasoning_prompt: Optional[str] = None,
**kwargs,
) -> None:
super().__init__()
# Do not use kwargs for now
assert kwargs == {}, f"Unexpected kwargs: {kwargs}"
accelerator = Accelerator()
if accelerator.num_processes > 1:
self._device = torch.device(f"cuda:{accelerator.local_process_index}")
self.device_map = f"cuda:{accelerator.local_process_index}"
# Simplified logic for single process
else: # accelerator.num_processes == 1
self._device = torch.device(device)
# Respect device_map if provided and not empty, otherwise use the determined device string
self.device_map = device_map if device_map else device
if use_flash_attention_2:
self._model = Qwen2VLForConditionalGeneration.from_pretrained(
pretrained,
torch_dtype="auto",
device_map=self.device_map,
attn_implementation="flash_attention_2",
).eval()
else:
self._model = Qwen2VLForConditionalGeneration.from_pretrained(pretrained, torch_dtype="auto", device_map=self.device_map).eval()
self.processor = AutoProcessor.from_pretrained(pretrained, max_pixels=max_pixels, min_pixels=min_pixels)
self.max_pixels = max_pixels
self.min_pixels = min_pixels
self.max_num_frames = max_num_frames
if reasoning_prompt:
self.reasoning_prompt = reasoning_prompt.replace("\\n", "\n")
else:
self.reasoning_prompt = None
self.system_prompt = system_prompt
self.interleave_visuals = interleave_visuals
self._tokenizer = AutoTokenizer.from_pretrained(pretrained)
self._config = self.model.config
# Initialize _max_length using the parameter or config (adjust attribute as needed)
# self._max_length = max_length if max_length is not None else self._config.max_position_embeddings
self._max_length = max_length # Using the provided parameter for now
self.batch_size_per_gpu = int(batch_size)
self.use_cache = use_cache
if accelerator.num_processes > 1:
assert accelerator.distributed_type in [
DistributedType.FSDP,
DistributedType.MULTI_GPU,
], "Unsupported distributed type provided. Only DDP and FSDP are supported."
if accelerator.distributed_type == DistributedType.FSDP:
self._model = accelerator.prepare(self.model)
else:
self._model = accelerator.prepare_model(self.model, evaluation_mode=True)
self.accelerator = accelerator
if self.accelerator.is_local_main_process:
eval_logger.info(f"Using {accelerator.num_processes} devices with data parallelism")
self._rank = self.accelerator.local_process_index
self._world_size = self.accelerator.num_processes
else:
self._rank = 0
self._world_size = 1
@property
def config(self):
# return the associated transformers.AutoConfig for the given pretrained model.
return self._config
@property
def tokenizer(self):
return self._tokenizer
@property
def model(self):
# returns the model, unwrapping it if using Accelerate
if hasattr(self, "accelerator"):
return self.accelerator.unwrap_model(self._model)
else:
return self._model
@property
def eot_token_id(self):
return self.tokenizer.eos_token_id
@property
def max_length(self):
# Ensure _max_length is initialized
if not hasattr(self, "_max_length") or self._max_length is None:
# Fallback or raise error if not initialized
# Example: Attempt to get from config if not set
try:
self._max_length = self.model.config.max_position_embeddings
except AttributeError:
raise AttributeError("'_max_length' was not initialized and could not be inferred from model config.")
return self._max_length
@property
def batch_size(self):
return self.batch_size_per_gpu
@property
def device(self):
return self._device
@property
def rank(self):
return self._rank
@property
def world_size(self):
return self._world_size
def loglikelihood(self, requests: List[Instance]) -> List[Tuple[float, bool]]:
raise NotImplementedError("Loglikelihood is not implemented for Qwen2_VL")
# TODO: Consider moving flatten to lmms_eval.utils if it's general purpose
def flatten(self, input):
new_list = []
for i in input:
for j in i:
new_list.append(j)
return new_list
def generate_until(self, requests: List[Instance]) -> List[str]:
res = []
def _collate(x):
# the negative sign on len(toks) sorts descending - this has a few advantages:
# - time estimates will always be over not underestimates, which is more useful for planning
# - to know the size of a batch when going through the list, you know the first one is always the batch
# padded context length. this is useful to simplify the batching logic and more importantly to make
# automatic adaptive batches much much easier to implement
# - any OOMs will happen right away rather than near the end
toks = self.tokenizer.encode(x[0])
return -len(toks), x[0]
# Import utils here if flatten is moved
import lmms_eval.utils as utils
pbar = tqdm(total=len(requests), disable=(self.rank != 0), desc="Model Responding")
# we group requests by their generation_kwargs,
# so that we don't try to execute e.g. greedy sampling and temp=0.8 sampling
# in the same batch.
re_ords = utils.Collator([reg.args for reg in requests], _collate, grouping=True)
chunks = re_ords.get_batched(n=self.batch_size, batch_fn=None)
for chunk in chunks:
contexts, all_gen_kwargs, doc_to_visual, doc_id, task, split = zip(*chunk)
task = task[0]
split = split[0]
# TODO: Clarify the behavior of doc_to_visual for documents without visual info.
# The current logic might incorrectly discard all visuals if one doc lacks them.
# Ensure flatten is appropriate here based on doc_to_visual's return type.
visual_list = [doc_to_visual[0](self.task_dict[task][split][ids]) for ids in doc_id]
if None in visual_list: # This check might need refinement
# If a mix of visual/non-visual is possible, this needs careful handling
# Currently sets all visuals to empty if any doc returns None
visual_list = []
else:
visual_list = self.flatten(visual_list) # Assumes doc_to_visual returns list of lists
gen_kwargs = all_gen_kwargs[0] if all_gen_kwargs else {}
# Set default values for until and max_new_tokens
until = [self.tokenizer.decode(self.eot_token_id)]
# Update values from gen_kwargs if present
if "until" in gen_kwargs:
until_from_kwargs = gen_kwargs.pop("until")
if isinstance(until_from_kwargs, str):
until = [until_from_kwargs]
elif isinstance(until_from_kwargs, list):
until = until_from_kwargs
else:
raise ValueError(f"Expected `gen_kwargs['until']` to be of type Union[str,list] but got {type(until_from_kwargs)}")
if isinstance(contexts, tuple):
contexts = list(contexts)
# Remove image tags from context text itself, as they are handled separately
contexts = [ctx.replace("<image>", "") for ctx in contexts]
batched_messages = []
# TODO: Consider refactoring message construction logic (especially visual processing)
# into helper methods for clarity (e.g., _prepare_message, _process_visuals).
for i, context in enumerate(contexts):
message = [{"role": "system", "content": self.system_prompt}]
current_context = context # Use a temporary variable
if self.reasoning_prompt:
current_context = current_context.strip() + self.reasoning_prompt
# Update the original contexts list as well if needed elsewhere, otherwise just use current_context
# contexts[i] = current_context # Uncomment if contexts needs to be updated
processed_visuals = []
# Use the potentially flattened visual_list relevant to this context 'i'
# This assumes visual_list aligns correctly with contexts after potential flattening
# Needs careful review based on doc_to_visual output structure
# For simplicity, assuming visual_list contains all visuals for the batch for now
# A more robust approach might map visuals back to their original context index.
relevant_visuals = visual_list # Placeholder: needs logic to get visuals for context 'i'
for visual in relevant_visuals:
if isinstance(visual, str) and visual.endswith((".mp4", ".avi", ".mov")): # Video file
try:
vr = decord.VideoReader(visual)
if len(vr) > 0:
first_frame = vr[0].asnumpy()
height, width = first_frame.shape[:2]
# max_pixels = height * width # This seems incorrect, should use instance config
processed_visuals.append({"type": "video", "video": visual, "max_pixels": self.max_pixels, "min_pixels": self.min_pixels})
else:
eval_logger.warning(f"Skipping empty video: {visual}")
except Exception as e:
eval_logger.error(f"Failed to process video {visual}: {e}")
elif isinstance(visual, Image.Image): # Handle PIL Image
try:
base64_image = visual.convert("RGB")
buffer = BytesIO()
base64_image.save(buffer, format="JPEG")
base64_bytes = base64.b64encode(buffer.getvalue())
base64_string = base64_bytes.decode("utf-8")
processed_visuals.append({"type": "image", "image": f"data:image/jpeg;base64,{base64_string}", "max_pixels": self.max_pixels, "min_pixels": self.min_pixels})
except Exception as e:
eval_logger.error(f"Failed to process PIL image: {e}")
# Add handling for other potential visual types if necessary
if not self.interleave_visuals:
# Add all visuals first, then the text
content_payload = processed_visuals + [{"type": "text", "text": current_context}]
message.append(
{
"role": "user",
"content": content_payload,
}
)
else: # Handle interleaving based on <image x> placeholders
image_placeholders = re.findall(r"<image \d+>", current_context)
content_parts = []
text_parts = re.split(r"<image \d+>", current_context)
if text_parts[0]:
content_parts.append({"type": "text", "text": text_parts[0]})
for idx, placeholder in enumerate(image_placeholders):
try:
img_idx_match = re.search(r"<image (\d+)>", placeholder)
if img_idx_match:
img_idx = int(img_idx_match.group(1)) - 1 # 1-based index in text
# Map text index to available processed visuals
if 0 <= img_idx < len(processed_visuals):
content_parts.append(processed_visuals[img_idx])
else:
eval_logger.warning(f"Image index {img_idx + 1} out of range for available visuals ({len(processed_visuals)}) in context.")
else:
eval_logger.warning(f"Could not parse index from placeholder: {placeholder}")
except Exception as e:
eval_logger.error(f"Error processing placeholder {placeholder}: {e}")
# Add the text part following this placeholder
if idx + 1 < len(text_parts) and text_parts[idx + 1]:
content_parts.append({"type": "text", "text": text_parts[idx + 1]})
message.append(
{
"role": "user",
"content": content_parts,
}
)
batched_messages.append(message)
texts = [self.processor.apply_chat_template(msg, tokenize=False, add_generation_prompt=True) for msg in batched_messages]
# TODO: Consider moving video frame sampling logic into process_vision_info or a helper.
image_inputs, video_inputs = process_vision_info(batched_messages)
if video_inputs is not None and len(video_inputs) > 0 and video_inputs[0] is not None:
# Assuming video_inputs is a list where the first element holds the tensor
video_tensor = video_inputs[0]
if isinstance(video_tensor, torch.Tensor) and video_tensor.ndim > 0 and video_tensor.shape[0] > 0:
total_frames = video_tensor.shape[0]
indices = np.linspace(0, total_frames - 1, self.max_num_frames, dtype=int, endpoint=True) # Ensure endpoint=True
# Ensure unique indices if linspace produces duplicates for few frames
indices = np.unique(indices)
# Append the last frame index if not already included and needed
# if total_frames > 0 and total_frames - 1 not in indices:
# indices = np.append(indices, total_frames - 1)
# indices = np.unique(indices) # Ensure uniqueness again
# Limit to max_num_frames if appending last frame exceeded it
if len(indices) > self.max_num_frames:
# This might happen if linspace already picked close indices including the end
# Or if max_num_frames is very small. Prioritize evenly spaced.
indices = np.linspace(0, total_frames - 1, self.max_num_frames, dtype=int, endpoint=True)
indices = np.unique(indices)
video_inputs[0] = video_tensor[indices]
else:
eval_logger.warning(f"Unexpected video_inputs format or empty tensor: {type(video_tensor)}")
inputs = self.processor(text=texts, images=image_inputs, videos=video_inputs, padding=True, return_tensors="pt")
if self.device_map == "auto":
inputs = inputs.to("cuda") # Assuming 'cuda' is the target for 'auto' on single GPU
else:
inputs = inputs.to(self.device)
# Set default generation kwargs first, then override with user-provided ones
default_gen_kwargs = {
"max_new_tokens": 128,
"temperature": 0.0, # Default to greedy
"top_p": None,
"num_beams": 1,
}
current_gen_kwargs = {**default_gen_kwargs, **gen_kwargs} # Provided gen_kwargs override defaults
pad_token_id = self.tokenizer.pad_token_id if self.tokenizer.pad_token_id is not None else self.tokenizer.eos_token_id
cont = self.model.generate(
**inputs,
eos_token_id=self.tokenizer.eos_token_id,
pad_token_id=pad_token_id,
do_sample=True if current_gen_kwargs["temperature"] > 0 else False,
temperature=current_gen_kwargs["temperature"],
top_p=current_gen_kwargs["top_p"],
num_beams=current_gen_kwargs["num_beams"],
max_new_tokens=current_gen_kwargs["max_new_tokens"],
use_cache=self.use_cache,
)
# Decode generated sequences, excluding input tokens
generated_ids_trimmed = []
for in_ids, out_ids in zip(inputs.input_ids, cont):
# Find the first position where output differs from input, or start after input length
input_len = len(in_ids)
# Handle potential padding in output; eos might appear before max length
try:
# Find first eos token in the generated part
eos_pos = (out_ids[input_len:] == self.tokenizer.eos_token_id).nonzero(as_tuple=True)[0]
if len(eos_pos) > 0:
# Slice generated part up to (but not including) the first EOS token
generated_ids_trimmed.append(out_ids[input_len : input_len + eos_pos[0]])
else:
# No EOS found, take the whole generated part
generated_ids_trimmed.append(out_ids[input_len:])
except IndexError: # Handle cases where output is shorter than input (shouldn't happen with generate)
generated_ids_trimmed.append(torch.tensor([], dtype=torch.long, device=out_ids.device))
answers = self.processor.batch_decode(generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False)
# Process answers to remove text after stop tokens
for i, ans in enumerate(answers):
stop_pos = len(ans) # Default to end of string
for term in until:
if term and term in ans: # Ensure term is not empty and exists
stop_pos = min(stop_pos, ans.index(term))
answers[i] = ans[:stop_pos].strip() # Trim whitespace from final answer
for ans, context in zip(answers, contexts):
res.append(ans)
# Use original gen_kwargs for caching, not the merged one
self.cache_hook.add_partial("generate_until", (context, gen_kwargs), ans)
pbar.update(1)
# reorder this group of results back to original unsorted form
res = re_ords.get_original(res)
pbar.close()
return res
def generate_until_multi_round(self, requests) -> List[str]:
raise NotImplementedError("TODO: Implement multi-round generation")
|