from typing import Optional, Union import torch from torchvision.transforms.v2 import functional as F from transformers.image_processing_base import BatchFeature from transformers.image_processing_utils_fast import ( BaseImageProcessorFast, DefaultFastImageProcessorKwargs, ) from transformers.image_transforms import group_images_by_shape, reorder_images from transformers.utils.constants import OPENAI_CLIP_MEAN, OPENAI_CLIP_STD from transformers.image_utils import ( ChannelDimension, ImageInput, PILImageResampling, SizeDict, ) from transformers.processing_utils import Unpack from transformers.utils.generic import TensorType from transformers.utils.auto_docstring import auto_docstring from transformers.utils import logging from transformers.video_utils import VideoInput, make_batched_videos from .image_processing_qwen2_vl import smart_resize logger = logging.get_logger(__name__) class Qwen2VLFastImageProcessorKwargs(DefaultFastImageProcessorKwargs): """ min_pixels (`int`, *optional*, defaults to `56 * 56`): The min pixels of the image to resize the image. max_pixels (`int`, *optional*, defaults to `28 * 28 * 1280`): The max pixels of the image to resize the image. patch_size (`int`, *optional*, defaults to 14): The spatial patch size of the vision encoder. temporal_patch_size (`int`, *optional*, defaults to 2): The temporal patch size of the vision encoder. merge_size (`int`, *optional*, defaults to 2): The merge size of the vision encoder to llm encoder. focus_size (`int`, *optional*, defaults to 2): The focus size of the ZoomFocus. """ min_pixels: Optional[int] max_pixels: Optional[int] patch_size: Optional[int] temporal_patch_size: Optional[int] merge_size: Optional[int] focus_size: Optional[int] @auto_docstring class ZFQwen2VLImageProcessorFast(BaseImageProcessorFast): do_resize = True resample = PILImageResampling.BICUBIC size = {"shortest_edge": 56 * 56, "longest_edge": 28 * 28 * 1280} do_rescale = True do_normalize = True image_mean = OPENAI_CLIP_MEAN image_std = OPENAI_CLIP_STD do_convert_rgb = True patch_size = 14 temporal_patch_size = 2 merge_size = 2 focus_size = 2 min_pixels = None max_pixels = None valid_kwargs = Qwen2VLFastImageProcessorKwargs model_input_names = ["pixel_values", "image_grid_thw", "pixel_values_videos", "video_grid_thw"] def __init__(self, **kwargs: Unpack[Qwen2VLFastImageProcessorKwargs]): size = kwargs.pop("size", None) min_pixels = kwargs.pop("min_pixels", None) max_pixels = kwargs.pop("max_pixels", None) # backward compatibility: override size with min_pixels and max_pixels if they are provided size = self.size if size is None else size if min_pixels is not None: size["shortest_edge"] = min_pixels # type: ignore size.pop("min_pixels", None) if max_pixels is not None: size["longest_edge"] = max_pixels # type: ignore size.pop("max_pixels", None) if "shortest_edge" not in size or "longest_edge" not in size: raise ValueError("size must contain 'shortest_edge' and 'longest_edge' keys.") super().__init__(size=size, min_pixels=min_pixels, max_pixels=max_pixels, **kwargs) # type: ignore def _further_process_kwargs( # type: ignore self, size: Optional[SizeDict] = None, min_pixels: Optional[int] = None, max_pixels: Optional[int] = None, **kwargs, ) -> dict: """ Update kwargs that need further processing before being validated Can be overridden by subclasses to customize the processing of kwargs. """ if min_pixels is not None and max_pixels is not None: size = {"shortest_edge": min_pixels, "longest_edge": max_pixels} # type: ignore elif size is not None: if "shortest_edge" not in size or "longest_edge" not in size: raise ValueError("size must contain 'shortest_edge' and 'longest_edge' keys.") min_pixels = size["shortest_edge"] max_pixels = size["longest_edge"] else: size = {**self.size} # type: ignore return super()._further_process_kwargs(size=size, min_pixels=min_pixels, max_pixels=max_pixels, **kwargs) @auto_docstring def preprocess( # type: ignore self, images: ImageInput, videos: Optional[VideoInput] = None, **kwargs: Unpack[Qwen2VLFastImageProcessorKwargs], ) -> BatchFeature: return super().preprocess(images, videos, **kwargs) def _preprocess_image_like_inputs( # type: ignore self, images: ImageInput, videos: VideoInput, do_convert_rgb: bool, input_data_format: ChannelDimension, device: Optional[Union[str, "torch.device"]] = None, **kwargs: Unpack[DefaultFastImageProcessorKwargs], # type: ignore ) -> BatchFeature: """ Preprocess image-like inputs. To be overridden by subclasses when image-like inputs other than images should be processed. It can be used for segmentation maps, depth maps, etc. """ # Prepare input images batch_feature = BatchFeature() if images is not None: images = self._prepare_image_like_inputs( images=images, do_convert_rgb=do_convert_rgb, input_data_format=input_data_format, device=device # type: ignore ) batch_feature = self._preprocess(images, **kwargs) # type: ignore if videos is not None: logger.warning( "`Qwen2VLImageProcessorFast` works only with image inputs and doesn't process videos anymore. " "This is a deprecated behavior and will be removed in v5.0. " "Your videos should be forwarded to `Qwen2VLVideoProcessor`. " ) # Can't change _prepare_images_structure to work with videos because it also needs to work with images. videos = make_batched_videos(videos) # type: ignore videos = [ torch.stack(self._prepare_image_like_inputs(video, do_convert_rgb, input_data_format, device)) # type: ignore for video in videos ] video_outputs = self._preprocess(videos, **kwargs) # type: ignore batch_feature.update( {"pixel_values_videos": video_outputs.pixel_values, "video_grid_thw": video_outputs.image_grid_thw} ) return batch_feature def _preprocess( # type: ignore self, images: list["torch.Tensor"], do_resize: bool, size: SizeDict, interpolation: Optional["F.InterpolationMode"], do_rescale: bool, rescale_factor: float, do_normalize: bool, image_mean: Optional[Union[float, list[float]]], image_std: Optional[Union[float, list[float]]], patch_size: int, temporal_patch_size: int, merge_size: int, focus_size: int, disable_grouping: Optional[bool], return_tensors: Optional[Union[str, TensorType]], **kwargs, ): # Group images by size for batched resizing grouped_images, grouped_images_index = group_images_by_shape(images, disable_grouping=disable_grouping) # type: ignore resized_images_grouped = {} for shape, stacked_images in grouped_images.items(): height, width = stacked_images.shape[-2:] # type: ignore if do_resize: resized_height, resized_width = smart_resize( height, width, factor=patch_size * merge_size * focus_size, min_pixels=size["shortest_edge"], max_pixels=size["longest_edge"], ) stacked_images = self.resize( image=stacked_images, # type: ignore size=SizeDict(height=resized_height, width=resized_width), interpolation=interpolation, ) resized_images_grouped[shape] = stacked_images resized_images = reorder_images(resized_images_grouped, grouped_images_index) # Group images by size for further processing # Needed in case do_resize is False, or resize returns images with different sizes grouped_images, grouped_images_index = group_images_by_shape(resized_images, disable_grouping=disable_grouping) # type: ignore processed_images_grouped = {} processed_grids = {} for shape, stacked_images in grouped_images.items(): resized_height, resized_width = stacked_images.shape[-2:] # type: ignore # Fused rescale and normalize patches = self.rescale_and_normalize( stacked_images, do_rescale, rescale_factor, do_normalize, image_mean, image_std # type: ignore ) if patches.ndim == 4: # add a temporal dimension if we have images patches = patches.unsqueeze(1) if patches.shape[1] % temporal_patch_size != 0: repeats = patches[:, -1:].repeat(1, temporal_patch_size - 1, 1, 1, 1) patches = torch.cat([patches, repeats], dim=1) batch_size, grid_t, channel = patches.shape[:3] grid_t = grid_t // temporal_patch_size grid_h, grid_w = resized_height // patch_size, resized_width // patch_size patches = patches.view( batch_size, grid_t, temporal_patch_size, channel, grid_h // merge_size, merge_size, patch_size, grid_w // merge_size, merge_size, patch_size, ) # Reorder dimensions to group grid and patch information for subsequent flattening. # (batch, grid_t, grid_h, grid_w, merge_h, merge_w, channel, temp_patch_size, patch_h, patch_w) patches = patches.permute(0, 1, 4, 7, 5, 8, 3, 2, 6, 9) flatten_patches = patches.reshape( batch_size, grid_t * grid_h * grid_w, channel * temporal_patch_size * patch_size * patch_size, ) processed_images_grouped[shape] = flatten_patches processed_grids[shape] = [[grid_t, grid_h, grid_w]] * batch_size processed_images = reorder_images(processed_images_grouped, grouped_images_index) processed_grids = reorder_images(processed_grids, grouped_images_index) pixel_values = torch.cat(processed_images, dim=0) # type: ignore image_grid_thw = torch.tensor(processed_grids) return BatchFeature( data={"pixel_values": pixel_values, "image_grid_thw": image_grid_thw}, tensor_type=return_tensors ) def get_number_of_image_patches(self, height: int, width: int, images_kwargs=None): """ A utility that returns number of image patches for a given image size. Note: Do not remove this method! It is used by vLLM to infer the number of patches and placeholders without an image input. Args: height (`int`): Height of the input image. width (`int`): Width of the input image. images_kwargs (`dict`, *optional*) Any kwargs to override defaults of the image processor. Returns: `int`: Number of image patches per image. """ min_pixels = images_kwargs["min_pixels"] if "min_pixels" in images_kwargs else self.size["shortest_edge"] # type: ignore max_pixels = images_kwargs["max_pixels"] if "max_pixels" in images_kwargs else self.size["longest_edge"] # type: ignore patch_size = images_kwargs.get("patch_size", self.patch_size) # type: ignore merge_size = images_kwargs.get("merge_size", self.merge_size) # type: ignore focus_size = images_kwargs.get("focus_size", self.focus_size) # type: ignore factor = patch_size * merge_size * focus_size resized_height, resized_width = smart_resize( height, width, factor, min_pixels=min_pixels, max_pixels=max_pixels ) grid_h, grid_w = resized_height // patch_size, resized_width // patch_size return grid_h * grid_w __all__ = ["ZFQwen2VLImageProcessorFast"]