from typing import Optional, Union import torch import torchvision.transforms.v2.functional as tvF from transformers.image_processing_base import BatchFeature from transformers.image_processing_utils_fast import BaseImageProcessorFast from transformers.image_transforms import group_images_by_shape, reorder_images from transformers.utils.constants import OPENAI_CLIP_MEAN, OPENAI_CLIP_STD from transformers.image_utils import ( ChannelDimension, ImageInput, PILImageResampling, SizeDict, ) from transformers.processing_utils import Unpack from transformers.utils.generic import TensorType from transformers.utils.auto_docstring import auto_docstring from transformers.utils import logging from .image_processing_qwen2_vl import ZFQwen2VLImageProcessorKwargs, smart_resize logger = logging.get_logger(__name__) @auto_docstring class ZFQwen2VLImageProcessorFast(BaseImageProcessorFast): do_resize = True resample = PILImageResampling.BICUBIC size = {"shortest_edge": 56 * 56, "longest_edge": 28 * 28 * 1280} do_rescale = True do_normalize = True image_mean = OPENAI_CLIP_MEAN image_std = OPENAI_CLIP_STD do_convert_rgb = True patch_size = 14 temporal_patch_size = 2 merge_size = 2 focus_size = 2 valid_kwargs = ZFQwen2VLImageProcessorKwargs model_input_names = ["pixel_values", "image_grid_thw"] def __init__(self, **kwargs: Unpack[ZFQwen2VLImageProcessorKwargs]): size = kwargs.pop("size", None) min_pixels = kwargs.pop("min_pixels", None) max_pixels = kwargs.pop("max_pixels", None) # backward compatibility: override size with min_pixels and max_pixels if they are provided size = self.size if size is None else size if min_pixels is not None: size["shortest_edge"] = min_pixels # type: ignore size.pop("min_pixels", None) # type: ignore if max_pixels is not None: size["longest_edge"] = max_pixels # type: ignore size.pop("max_pixels", None) # type: ignore if "shortest_edge" not in size or "longest_edge" not in size: # type: ignore raise ValueError("size must contain 'shortest_edge' and 'longest_edge' keys.") super().__init__(size=size, **kwargs) # type: ignore def _further_process_kwargs( # type: ignore self, size: SizeDict | None = None, min_pixels: int | None = None, max_pixels: int | None = None, **kwargs, ) -> dict: """ Update kwargs that need further processing before being validated Can be overridden by subclasses to customize the processing of kwargs. """ if min_pixels is not None and max_pixels is not None: size = {"shortest_edge": min_pixels, "longest_edge": max_pixels} # type: ignore elif size is not None: if "shortest_edge" not in size or "longest_edge" not in size: raise ValueError("size must contain 'shortest_edge' and 'longest_edge' keys.") min_pixels = size["shortest_edge"] max_pixels = size["longest_edge"] else: size = {**self.size} # type: ignore return super()._further_process_kwargs(size=size, **kwargs) @auto_docstring def preprocess( # type: ignore self, images: ImageInput, **kwargs: Unpack[ZFQwen2VLImageProcessorKwargs], ) -> BatchFeature: return super().preprocess(images, **kwargs) def _preprocess_image_like_inputs( # type: ignore self, images: ImageInput, do_convert_rgb: bool, input_data_format: ChannelDimension, device: Union[str, "torch.device"] | None = None, **kwargs: Unpack[ZFQwen2VLImageProcessorKwargs], # type: ignore ) -> BatchFeature: """ Preprocess image-like inputs. To be overridden by subclasses when image-like inputs other than images should be processed. It can be used for segmentation maps, depth maps, etc. """ # Prepare input images batch_feature = BatchFeature() images = self._prepare_image_like_inputs( images=images, do_convert_rgb=do_convert_rgb, input_data_format=input_data_format, device=device # type: ignore ) batch_feature = self._preprocess(images, **kwargs) # type: ignore return batch_feature def _preprocess( # type: ignore self, images: list["torch.Tensor"], do_resize: bool, size: SizeDict, interpolation: Optional["tvF.InterpolationMode"], do_rescale: bool, rescale_factor: float, do_normalize: bool, image_mean: float | list[float] | None, image_std: float | list[float] | None, patch_size: int, temporal_patch_size: int, merge_size: int, focus_size: int, disable_grouping: bool | None, return_tensors: str | TensorType | None, **kwargs, ): # Group images by size for batched resizing grouped_images, grouped_images_index = group_images_by_shape(images, disable_grouping=disable_grouping) resized_images_grouped = {} for shape, stacked_images in grouped_images.items(): height, width = stacked_images.shape[-2:] if do_resize: resized_height, resized_width = smart_resize( height, width, factor=patch_size * merge_size * focus_size, min_pixels=size["shortest_edge"], max_pixels=size["longest_edge"], ) stacked_images = self.resize( image=stacked_images, size=SizeDict(height=resized_height, width=resized_width), interpolation=interpolation, ) resized_images_grouped[shape] = stacked_images resized_images = reorder_images(resized_images_grouped, grouped_images_index) # Group images by size for further processing # Needed in case do_resize is False, or resize returns images with different sizes grouped_images, grouped_images_index = group_images_by_shape(resized_images, disable_grouping=disable_grouping) processed_images_grouped = {} processed_grids = {} for shape, stacked_images in grouped_images.items(): resized_height, resized_width = stacked_images.shape[-2:] # Fused rescale and normalize patches = self.rescale_and_normalize( stacked_images, do_rescale, rescale_factor, do_normalize, image_mean, image_std # type: ignore ) if patches.ndim == 4: # add a temporal dimension if we have images patches = patches.unsqueeze(1) if patches.shape[1] % temporal_patch_size != 0: repeats = patches[:, -1:].repeat(1, temporal_patch_size - 1, 1, 1, 1) patches = torch.cat([patches, repeats], dim=1) batch_size, grid_t, channel = patches.shape[:3] grid_t = grid_t // temporal_patch_size grid_h, grid_w = resized_height // patch_size, resized_width // patch_size patches = patches.view( batch_size, grid_t, temporal_patch_size, channel, grid_h // merge_size, merge_size, patch_size, grid_w // merge_size, merge_size, patch_size, ) # Reorder dimensions to group grid and patch information for subsequent flattening. # (batch, grid_t, grid_h, grid_w, merge_h, merge_w, channel, temp_patch_size, patch_h, patch_w) patches = patches.permute(0, 1, 4, 7, 5, 8, 3, 2, 6, 9) flatten_patches = patches.reshape( batch_size, grid_t * grid_h * grid_w, channel * temporal_patch_size * patch_size * patch_size, ) processed_images_grouped[shape] = flatten_patches processed_grids[shape] = [[grid_t, grid_h, grid_w]] * batch_size processed_images = reorder_images(processed_images_grouped, grouped_images_index) processed_grids = reorder_images(processed_grids, grouped_images_index) pixel_values = torch.cat(processed_images, dim=0) # type: ignore image_grid_thw = torch.tensor(processed_grids) return BatchFeature( data={"pixel_values": pixel_values, "image_grid_thw": image_grid_thw}, tensor_type=return_tensors ) def get_number_of_image_patches(self, height: int, width: int, images_kwargs=None): """ A utility that returns number of image patches for a given image size. Note: Do not remove this method! It is used by vLLM to infer the number of patches and placeholders without an image input. Args: height (`int`): Height of the input image. width (`int`): Width of the input image. images_kwargs (`dict`, *optional*) Any kwargs to override defaults of the image processor. Returns: `int`: Number of image patches per image. """ min_pixels = images_kwargs["min_pixels"] if "min_pixels" in images_kwargs else self.size["shortest_edge"] # type: ignore max_pixels = images_kwargs["max_pixels"] if "max_pixels" in images_kwargs else self.size["longest_edge"] # type: ignore patch_size = images_kwargs.get("patch_size", self.patch_size) # type: ignore merge_size = images_kwargs.get("merge_size", self.merge_size) # type: ignore focus_size = images_kwargs.get("focus_size", self.focus_size) # type: ignore factor = patch_size * merge_size * focus_size resized_height, resized_width = smart_resize( height, width, factor, min_pixels=min_pixels, max_pixels=max_pixels ) grid_h, grid_w = resized_height // patch_size, resized_width // patch_size return grid_h * grid_w __all__ = ["ZFQwen2VLImageProcessorFast"]