|
|
import copy |
|
|
import math |
|
|
import os |
|
|
from typing import Dict, List, Optional, Union |
|
|
|
|
|
import numpy as np |
|
|
import torch |
|
|
from PIL import Image |
|
|
from transformers import Qwen2_5_VLProcessor |
|
|
from transformers.image_processing_utils import ( |
|
|
BaseImageProcessor, |
|
|
BatchFeature, |
|
|
get_size_dict, |
|
|
) |
|
|
from transformers.image_transforms import ( |
|
|
convert_to_rgb, |
|
|
get_resize_output_image_size, |
|
|
resize, |
|
|
to_channel_dimension_format, |
|
|
) |
|
|
from transformers.image_utils import ( |
|
|
OPENAI_CLIP_MEAN, |
|
|
OPENAI_CLIP_STD, |
|
|
ChannelDimension, |
|
|
ImageInput, |
|
|
PILImageResampling, |
|
|
get_image_size, |
|
|
infer_channel_dimension_format, |
|
|
is_scaled_image, |
|
|
make_list_of_images, |
|
|
to_numpy_array, |
|
|
valid_images, |
|
|
) |
|
|
from transformers.models.qwen2_5_vl.processing_qwen2_5_vl import ( |
|
|
Qwen2_5_VLProcessorKwargs, |
|
|
) |
|
|
from transformers.tokenization_utils_base import PreTokenizedInput, TextInput |
|
|
from transformers.utils import TensorType, logging |
|
|
from transformers.video_utils import VideoInput |
|
|
from typing_extensions import Unpack |
|
|
|
|
|
logger = logging.get_logger(__name__) |
|
|
|
|
|
|
|
|
def determine_possible_resolutions(anyres: bool, max_num_grids: int, grid_size: int, use_1x1_grid: bool = False): |
|
|
"""총 max_num_grids 이하의 possible resolution 조합을 찾아 반환합니다. |
|
|
max_num_grids 가 예를 들어 4인 경우, 총 가능한 grid 조합은 [1x1, 1x2, 1x3, 1x4, 2x1, 2x2, 3x1, 4x1] 이고, 따라서 아래와 같이 계산됩니다. |
|
|
>>> possible_resolutions = determine_possible_resolutions(anyres=True, max_num_grids=4, grid_size=336) |
|
|
>>> print(possible_resolutions) |
|
|
[[336, 336], [336, 672], [336, 1008], [336, 1344], [672, 336], [672, 672], [1008, 336], [1344, 336]] |
|
|
""" |
|
|
possible_resolutions = [] |
|
|
if anyres: |
|
|
assert max_num_grids > 0 |
|
|
for i in range(1, max_num_grids + 1): |
|
|
for j in range(1, max_num_grids + 1): |
|
|
if i == 1 and j == 1 and not use_1x1_grid: |
|
|
continue |
|
|
if i * j <= max_num_grids: |
|
|
possible_resolutions.append([i, j]) |
|
|
|
|
|
possible_resolutions = [[ys * grid_size, xs * grid_size] for ys, xs in possible_resolutions] |
|
|
|
|
|
return possible_resolutions |
|
|
|
|
|
|
|
|
def divide_to_grids(image: np.array, grid_size: int, input_data_format=None) -> List[np.array]: |
|
|
"""local image 를 (grid_size x grid_size) grid 로 divide""" |
|
|
grids = [] |
|
|
height, width = get_image_size(image, channel_dim=input_data_format) |
|
|
for i in range(0, height, grid_size): |
|
|
for j in range(0, width, grid_size): |
|
|
if input_data_format == ChannelDimension.LAST: |
|
|
grid = image[i : i + grid_size, j : j + grid_size] |
|
|
else: |
|
|
grid = image[:, i : i + grid_size, j : j + grid_size] |
|
|
grids.append(grid) |
|
|
|
|
|
return grids |
|
|
|
|
|
|
|
|
def pad(image: np.array, target_size: tuple, background_color=(127, 127, 127), input_data_format=None) -> np.array: |
|
|
"""image 양옆, 좌우에 padding 을 하여 target_height, target_width 만큼 키움""" |
|
|
target_height, target_width = target_size |
|
|
height, width = get_image_size(image, channel_dim=input_data_format) |
|
|
|
|
|
|
|
|
result = np.empty((target_height, target_width, image.shape[2]), dtype=image.dtype) |
|
|
for i in range(image.shape[2]): |
|
|
result[..., i].fill(background_color[i]) |
|
|
|
|
|
paste_x = (target_width - width) // 2 |
|
|
paste_y = (target_height - height) // 2 |
|
|
|
|
|
result[paste_y : paste_y + height, paste_x : paste_x + width, :] = image |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
def expand2square( |
|
|
image: np.array, bboxes_dict=None, background_color=(127, 127, 127), input_data_format=None |
|
|
) -> np.array: |
|
|
""" |
|
|
새로운 canvas 를 만들어 두고, 거기에 이미지를 붙여넣는 방식으로 이미지를 정사각형으로 만드는 함수 |
|
|
유의할 사항은, 이미지를 붙여 넣을 때 중앙으로 붙여넣는다는 점. 양옆 또는 위아래로 PADDING 이 들어가는 형태 |
|
|
Args: |
|
|
pil_img: numpy array |
|
|
bboxes_dict: dict, {"ocr": NDArray shape (N, 4, 2), "html": NDArray shape (N, 4, 2), ... } |
|
|
`[[xtl, ytl], [xtr, ytr], [xbr, ybr], [xbl, ybl]]` 형태로 박스 형태는 통일. OCR, HTML 등 다양한 박스들을 한번에 처리 가능 |
|
|
background_color: tuple, RGB |
|
|
# >>> _img = np.ones((80, 100), dtype=np.uint8) * 100 |
|
|
# >>> _bboxes_dict = {"words": np.array([[[10, 10], [20, 10], [20, 20], [10, 20]], |
|
|
# ... [[30, 30], [40, 30], [40, 40], [30, 40]]])} |
|
|
# >>> _img, _bboxes_dict = expand2square(_img, _bboxes_dict, (255, 255, 255)) |
|
|
# >>> _img.shape |
|
|
# (100, 100) |
|
|
# >>> guessed_ocr_bboxes = np.array([[[20, 10], [30, 10], [30, 20], [20, 20]], |
|
|
# ... [[40, 30], [50, 30], [50, 40], [40, 40]]]) |
|
|
# >>> np.testing.assert_array_almost_equal(_bboxes_dict["words"], guessed_ocr_bboxes) is None |
|
|
# True |
|
|
""" |
|
|
height, width = get_image_size(image, channel_dim=input_data_format) |
|
|
if width == height: |
|
|
return image, bboxes_dict |
|
|
elif width > height: |
|
|
|
|
|
result = np.empty((width, width, image.shape[2]), dtype=image.dtype) |
|
|
for i in range(image.shape[2]): |
|
|
result[..., i].fill(background_color[i]) |
|
|
|
|
|
result[(width - height) // 2 : (width - height) // 2 + height, :] = image |
|
|
if bboxes_dict is not None: |
|
|
for key in bboxes_dict: |
|
|
bboxes_dict[key][:, :, 1] += (width - height) // 2 |
|
|
return result, bboxes_dict |
|
|
else: |
|
|
|
|
|
result = np.empty((height, height, image.shape[2]), dtype=image.dtype) |
|
|
for i in range(image.shape[2]): |
|
|
result[..., i].fill(background_color[i]) |
|
|
|
|
|
result[:, (height - width) // 2 : (height - width) // 2 + width] = image |
|
|
if bboxes_dict is not None: |
|
|
for key in bboxes_dict: |
|
|
bboxes_dict[key][:, :, 0] += (height - width) // 2 |
|
|
return result, bboxes_dict |
|
|
|
|
|
|
|
|
def resize_longside( |
|
|
image: np.array, |
|
|
size: int, |
|
|
resample: PILImageResampling = PILImageResampling.BICUBIC, |
|
|
data_format: Optional[Union[str, ChannelDimension]] = None, |
|
|
input_data_format: Optional[Union[str, ChannelDimension]] = None, |
|
|
): |
|
|
""" |
|
|
장축 길이를 size 에 맞게 resize |
|
|
""" |
|
|
height, width = get_image_size(image, channel_dim=input_data_format) |
|
|
|
|
|
if width == height: |
|
|
target_height, target_width = size, size |
|
|
elif width > height: |
|
|
target_width = size |
|
|
target_height = math.ceil(height / width * size) |
|
|
else: |
|
|
target_width = math.ceil(width / height * size) |
|
|
target_height = size |
|
|
|
|
|
return resize( |
|
|
image, |
|
|
size=(target_height, target_width), |
|
|
resample=resample, |
|
|
data_format=data_format, |
|
|
input_data_format=input_data_format, |
|
|
) |
|
|
|
|
|
|
|
|
def select_best_resolution(original_size: tuple, possible_resolutions: list) -> tuple: |
|
|
"""From LLaVA-Next (https://github.com/huggingface/transformers/blob/v4.40.2/src/transformers/models/llava_next/image_processing_llava_next.py) |
|
|
Selects the best resolution from a list of possible resolutions based on the original size. |
|
|
This is done by calculating the effective and wasted resolution for each possible resolution. |
|
|
The best fit resolution is the one that maximizes the effective resolution and minimizes the wasted resolution. |
|
|
|
|
|
Args: |
|
|
original_size (tuple): |
|
|
The original size of the image in the format (height, width). |
|
|
possible_resolutions (list): |
|
|
A list of possible resolutions in the format [(height1, width1), (height2, width2), ...]. |
|
|
|
|
|
Returns: |
|
|
tuple: The best fit resolution in the format (height, width). |
|
|
""" |
|
|
original_height, original_width = original_size |
|
|
best_fit = None |
|
|
max_effective_resolution = 0 |
|
|
min_wasted_resolution = float("inf") |
|
|
|
|
|
for height, width in possible_resolutions: |
|
|
scale = min(width / original_width, height / original_height) |
|
|
downscaled_width, downscaled_height = int(original_width * scale), int(original_height * scale) |
|
|
effective_resolution = min(downscaled_width * downscaled_height, original_width * original_height) |
|
|
wasted_resolution = (width * height) - effective_resolution |
|
|
|
|
|
if effective_resolution > max_effective_resolution or ( |
|
|
effective_resolution == max_effective_resolution and wasted_resolution < min_wasted_resolution |
|
|
): |
|
|
max_effective_resolution = effective_resolution |
|
|
min_wasted_resolution = wasted_resolution |
|
|
best_fit = (height, width) |
|
|
|
|
|
return best_fit |
|
|
|
|
|
|
|
|
def _get_local_grids_output_size(image: np.array, target_resolution: tuple, input_data_format=None): |
|
|
original_height, original_width = get_image_size(image, channel_dim=input_data_format) |
|
|
target_height, target_width = target_resolution |
|
|
|
|
|
scale_w = target_width / original_width |
|
|
scale_h = target_height / original_height |
|
|
|
|
|
if scale_w < scale_h: |
|
|
new_width = target_width |
|
|
new_height = min(math.ceil(original_height * scale_w), target_height) |
|
|
else: |
|
|
new_height = target_height |
|
|
new_width = min(math.ceil(original_width * scale_h), target_width) |
|
|
|
|
|
return new_height, new_width |
|
|
|
|
|
|
|
|
def determine_anyres_num_vision_patches( |
|
|
num_grids, |
|
|
image_size, |
|
|
grid_size, |
|
|
patch_size, |
|
|
possible_resolutions, |
|
|
anyres=False, |
|
|
unpad=True, |
|
|
num_queries_vis_abstractor=0, |
|
|
num_queries_vis_abstractor_slow=0, |
|
|
video=False, |
|
|
first_last_frames_slow=False, |
|
|
is_first_or_last_frames=False, |
|
|
): |
|
|
"""visual tokens 수를 계산해주는 함수""" |
|
|
if not anyres: |
|
|
return num_queries_vis_abstractor if num_queries_vis_abstractor > 0 else (grid_size // patch_size) ** 2 |
|
|
|
|
|
if num_queries_vis_abstractor > 0: |
|
|
num_patch_per_grid = int(num_queries_vis_abstractor**0.5) |
|
|
else: |
|
|
num_patch_per_grid = grid_size // patch_size |
|
|
|
|
|
num_global_per_grid = num_patch_per_grid |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
height, width = select_best_resolution(image_size, possible_resolutions) |
|
|
|
|
|
num_patch_height = (height // grid_size) * num_patch_per_grid |
|
|
num_patch_width = (width // grid_size) * num_patch_per_grid |
|
|
|
|
|
|
|
|
if unpad: |
|
|
original_height, original_width = image_size |
|
|
|
|
|
original_aspect_ratio = original_width / original_height |
|
|
current_aspect_ratio = num_patch_width / num_patch_height |
|
|
|
|
|
if original_aspect_ratio > current_aspect_ratio: |
|
|
scale_factor = num_patch_width / original_width |
|
|
new_height = int(original_height * scale_factor) |
|
|
padding = (num_patch_height - new_height) // 2 |
|
|
num_patch_height = num_patch_height - padding * 2 |
|
|
else: |
|
|
scale_factor = num_patch_height / original_height |
|
|
new_width = int(original_width * scale_factor) |
|
|
padding = (num_patch_width - new_width) // 2 |
|
|
num_patch_width = num_patch_width - padding * 2 |
|
|
|
|
|
num_patches = num_patch_width * num_patch_height + num_patch_height |
|
|
else: |
|
|
num_patches = num_patch_width * num_patch_height |
|
|
|
|
|
|
|
|
if num_queries_vis_abstractor_slow > 0: |
|
|
if first_last_frames_slow: |
|
|
if is_first_or_last_frames: |
|
|
num_patches += num_queries_vis_abstractor_slow - num_queries_vis_abstractor |
|
|
else: |
|
|
num_patches += num_queries_vis_abstractor_slow - num_queries_vis_abstractor |
|
|
|
|
|
assert unpad is False |
|
|
|
|
|
|
|
|
if not video: |
|
|
num_patches += num_global_per_grid**2 |
|
|
|
|
|
return num_patches |
|
|
|
|
|
|
|
|
class HCXVisionImageProcessor(BaseImageProcessor): |
|
|
r""" |
|
|
Constructs a VLM image processor. Based on [`CLIPImageProcessor`] with incorporation of additional techniques for processing high resolution images. |
|
|
|
|
|
Args: |
|
|
anyres: (bool) anyres 기능을 사용할지 안할지 |
|
|
unpad: (bool) anyres 사용시, unpad 기능 (순수 pad 영역에 해당하는 visual tokens 은 LLM input 에서 제거) 을 사용할지 안할지 |
|
|
num_queries_vis_abstractor: (int) 각 grid 에 대해서 resampler 를 사용하는 경우, visual query 수 |
|
|
possible_resolutions: (List) anyres 기능 사용시, 가능한 resolution 조합, 예: [[336, 336], [336, 672], [672, 336]] |
|
|
patch_size: (int) ViT patch size |
|
|
pad_to_square: (bool) 정사각형으로 padding 을 수행할지, 안할지를 결정. False 이면 정사각형이 아니기 때문에 center crop 을 거쳐 ViT 의 입력으로 들어감 |
|
|
""" |
|
|
|
|
|
model_input_names = ["pixel_values"] |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
do_resize: bool = True, |
|
|
size: Dict[str, int] = None, |
|
|
anyres: bool = False, |
|
|
unpad: bool = False, |
|
|
num_queries_vis_abstractor: int = 0, |
|
|
possible_resolutions: List = [], |
|
|
patch_size: int = 14, |
|
|
pad_to_square: bool = True, |
|
|
resample: PILImageResampling = PILImageResampling.BICUBIC, |
|
|
do_center_crop: bool = True, |
|
|
crop_size: Dict[str, int] = None, |
|
|
do_rescale: bool = True, |
|
|
rescale_factor: Union[int, float] = 1 / 255, |
|
|
do_normalize: bool = True, |
|
|
image_mean: Optional[Union[float, List[float]]] = None, |
|
|
image_std: Optional[Union[float, List[float]]] = None, |
|
|
do_convert_rgb: bool = True, |
|
|
**kwargs, |
|
|
) -> None: |
|
|
super().__init__(**kwargs) |
|
|
size = size if size is not None else {"shortest_edge": 336} |
|
|
size = get_size_dict(size, default_to_square=False) |
|
|
crop_size = crop_size if crop_size is not None else {"height": 336, "width": 336} |
|
|
crop_size = get_size_dict(crop_size, default_to_square=True, param_name="crop_size") |
|
|
|
|
|
self.do_resize = do_resize |
|
|
self.size = size |
|
|
self.anyres = anyres |
|
|
self.unpad = unpad |
|
|
self.num_queries_vis_abstractor = num_queries_vis_abstractor |
|
|
self.possible_resolutions = [_resolution for _resolution in possible_resolutions] |
|
|
self.patch_size = patch_size |
|
|
self.pad_to_square = pad_to_square |
|
|
self.resample = resample |
|
|
self.do_center_crop = do_center_crop |
|
|
self.crop_size = crop_size |
|
|
self.do_rescale = do_rescale |
|
|
self.rescale_factor = rescale_factor |
|
|
self.do_normalize = do_normalize |
|
|
self.image_mean = image_mean if image_mean is not None else OPENAI_CLIP_MEAN |
|
|
self.image_std = image_std if image_std is not None else OPENAI_CLIP_STD |
|
|
self.do_convert_rgb = do_convert_rgb |
|
|
|
|
|
def resize( |
|
|
self, |
|
|
image: np.ndarray, |
|
|
size: Dict[str, int], |
|
|
resample: PILImageResampling = PILImageResampling.BICUBIC, |
|
|
data_format: Optional[Union[str, ChannelDimension]] = None, |
|
|
input_data_format: Optional[Union[str, ChannelDimension]] = None, |
|
|
**kwargs, |
|
|
) -> np.ndarray: |
|
|
default_to_square = True |
|
|
if "shortest_edge" in size: |
|
|
size = size["shortest_edge"] |
|
|
default_to_square = False |
|
|
elif "height" in size and "width" in size: |
|
|
size = (size["height"], size["width"]) |
|
|
else: |
|
|
raise ValueError("Size must contain either 'shortest_edge' or 'height' and 'width'.") |
|
|
|
|
|
output_size = get_resize_output_image_size( |
|
|
image, |
|
|
size=size, |
|
|
default_to_square=default_to_square, |
|
|
input_data_format=input_data_format, |
|
|
) |
|
|
|
|
|
return resize( |
|
|
image, |
|
|
size=output_size, |
|
|
resample=resample, |
|
|
data_format=data_format, |
|
|
input_data_format=input_data_format, |
|
|
**kwargs, |
|
|
) |
|
|
|
|
|
def _preprocess( |
|
|
self, |
|
|
images: ImageInput, |
|
|
do_resize: bool = None, |
|
|
size: Dict[str, int] = None, |
|
|
resample: PILImageResampling = None, |
|
|
do_center_crop: bool = None, |
|
|
crop_size: int = None, |
|
|
do_rescale: bool = None, |
|
|
rescale_factor: float = None, |
|
|
do_normalize: bool = None, |
|
|
image_mean: Optional[Union[float, List[float]]] = None, |
|
|
image_std: Optional[Union[float, List[float]]] = None, |
|
|
data_format: Optional[ChannelDimension] = ChannelDimension.FIRST, |
|
|
input_data_format: Optional[Union[str, ChannelDimension]] = None, |
|
|
) -> Image.Image: |
|
|
images = make_list_of_images(images) |
|
|
|
|
|
if do_resize: |
|
|
images = [ |
|
|
self.resize(image=image, size=size, resample=resample, input_data_format=input_data_format) |
|
|
for image in images |
|
|
] |
|
|
|
|
|
if do_center_crop: |
|
|
images = [ |
|
|
self.center_crop(image=image, size=crop_size, input_data_format=input_data_format) for image in images |
|
|
] |
|
|
|
|
|
if do_rescale: |
|
|
images = [ |
|
|
self.rescale(image=image, scale=rescale_factor, input_data_format=input_data_format) for image in images |
|
|
] |
|
|
|
|
|
if do_normalize: |
|
|
images = [ |
|
|
self.normalize(image=image, mean=image_mean, std=image_std, input_data_format=input_data_format) |
|
|
for image in images |
|
|
] |
|
|
|
|
|
images = [ |
|
|
to_channel_dimension_format(image, data_format, input_channel_dim=input_data_format) for image in images |
|
|
] |
|
|
|
|
|
return images |
|
|
|
|
|
def _resize_for_local_grids( |
|
|
self, image: np.array, target_resolution: tuple, resample, input_data_format: ChannelDimension |
|
|
) -> np.array: |
|
|
new_height, new_width = _get_local_grids_output_size(image, target_resolution, input_data_format) |
|
|
|
|
|
|
|
|
resized_image = resize(image, (new_height, new_width), resample=resample, input_data_format=input_data_format) |
|
|
|
|
|
return resized_image |
|
|
|
|
|
def _pad_for_patching( |
|
|
self, image: np.array, target_resolution: tuple, input_data_format: ChannelDimension |
|
|
) -> np.array: |
|
|
""" |
|
|
Pad an image to a target resolution while maintaining aspect ratio. |
|
|
""" |
|
|
target_height, target_width = target_resolution |
|
|
|
|
|
background_color = tuple(int(x * 255) for x in self.image_mean) |
|
|
padded_image = pad( |
|
|
image, |
|
|
target_size=(target_height, target_width), |
|
|
background_color=background_color, |
|
|
input_data_format=input_data_format, |
|
|
) |
|
|
|
|
|
return padded_image |
|
|
|
|
|
def get_image_grids( |
|
|
self, |
|
|
image: np.array, |
|
|
possible_resolutions, |
|
|
grid_size: int, |
|
|
resample: PILImageResampling, |
|
|
data_format: ChannelDimension, |
|
|
input_data_format: ChannelDimension, |
|
|
) -> List[np.array]: |
|
|
if not isinstance(possible_resolutions, list): |
|
|
raise ValueError("possible_resolutions must be a list of possible resolutions.") |
|
|
|
|
|
image_size = get_image_size(image, channel_dim=input_data_format) |
|
|
best_resolution = select_best_resolution(image_size, possible_resolutions) |
|
|
resized_image = self._resize_for_local_grids( |
|
|
image, best_resolution, resample=resample, input_data_format=input_data_format |
|
|
) |
|
|
padded_image = self._pad_for_patching(resized_image, best_resolution, input_data_format=input_data_format) |
|
|
local_grids = divide_to_grids(padded_image, grid_size=grid_size, input_data_format=input_data_format) |
|
|
|
|
|
|
|
|
local_grids = [ |
|
|
to_channel_dimension_format(grid, channel_dim=data_format, input_channel_dim=input_data_format) |
|
|
for grid in local_grids |
|
|
] |
|
|
|
|
|
return local_grids |
|
|
|
|
|
def preprocess( |
|
|
self, |
|
|
images: ImageInput, |
|
|
do_resize: bool = None, |
|
|
size: Dict[str, int] = None, |
|
|
anyres: bool = None, |
|
|
unpad: bool = None, |
|
|
video: bool = None, |
|
|
num_queries_vis_abstractor: int = None, |
|
|
possible_resolutions: List = None, |
|
|
patch_size: int = None, |
|
|
pad_to_square: bool = None, |
|
|
resample: PILImageResampling = None, |
|
|
do_center_crop: bool = None, |
|
|
crop_size: int = None, |
|
|
do_rescale: bool = None, |
|
|
rescale_factor: float = None, |
|
|
do_normalize: bool = None, |
|
|
image_mean: Optional[Union[float, List[float]]] = None, |
|
|
image_std: Optional[Union[float, List[float]]] = None, |
|
|
do_convert_rgb: bool = None, |
|
|
return_tensors: Optional[Union[str, TensorType]] = None, |
|
|
data_format: Optional[ChannelDimension] = ChannelDimension.FIRST, |
|
|
input_data_format: Optional[Union[str, ChannelDimension]] = None, |
|
|
return_dummy_image: bool = False, |
|
|
num_queries_vis_abstractor_slow: int = 0, |
|
|
first_last_frames_slow: bool = False, |
|
|
is_first_or_last_frames: bool = False, |
|
|
): |
|
|
""" |
|
|
HCXVisionImageProcessor 로 image tensor, original image size (width, height), visual tokens |
|
|
|
|
|
:return pixel_values: List of 4D tensor 로 image tensor |
|
|
:return image_sizes: List of Dict 로 image width, height [{"width": image 1 의 width, "height": image 1 의 height}, {"width": image 2 의 width, "height": image 2 의 height}, ...] |
|
|
:return vision_query_lengths: List of int 로 각 image 가 LLM 입력으로 전달될때 변환되는 visual token 수 |
|
|
""" |
|
|
do_resize = do_resize if do_resize is not None else self.do_resize |
|
|
size = size if size is not None else self.size |
|
|
size = get_size_dict(size, param_name="size", default_to_square=False) |
|
|
anyres = anyres if anyres is not None else self.anyres |
|
|
unpad = unpad if unpad is not None else self.unpad |
|
|
if video: |
|
|
unpad = False |
|
|
num_queries_vis_abstractor = ( |
|
|
num_queries_vis_abstractor if num_queries_vis_abstractor is not None else self.num_queries_vis_abstractor |
|
|
) |
|
|
possible_resolutions = possible_resolutions if possible_resolutions is not None else self.possible_resolutions |
|
|
patch_size = patch_size if patch_size is not None else self.patch_size |
|
|
pad_to_square = pad_to_square if pad_to_square is not None else self.pad_to_square |
|
|
resample = resample if resample is not None else self.resample |
|
|
do_center_crop = do_center_crop if do_center_crop is not None else self.do_center_crop |
|
|
crop_size = crop_size if crop_size is not None else self.crop_size |
|
|
crop_size = get_size_dict(crop_size, param_name="crop_size", default_to_square=True) |
|
|
do_rescale = do_rescale if do_rescale is not None else self.do_rescale |
|
|
rescale_factor = rescale_factor if rescale_factor is not None else self.rescale_factor |
|
|
do_normalize = do_normalize if do_normalize is not None else self.do_normalize |
|
|
image_mean = image_mean if image_mean is not None else self.image_mean |
|
|
image_std = image_std if image_std is not None else self.image_std |
|
|
do_convert_rgb = do_convert_rgb if do_convert_rgb is not None else self.do_convert_rgb |
|
|
|
|
|
if return_dummy_image: |
|
|
images = Image.new("RGB", (224, 224), (0, 0, 0)) |
|
|
|
|
|
images = make_list_of_images(images) |
|
|
|
|
|
if not valid_images(images): |
|
|
raise ValueError( |
|
|
"Invalid image type. Must be of type PIL.Image.Image, numpy.ndarray, " |
|
|
"torch.Tensor, tf.Tensor or jax.ndarray." |
|
|
) |
|
|
|
|
|
if do_convert_rgb: |
|
|
images = [convert_to_rgb(image) for image in images] |
|
|
|
|
|
|
|
|
images = [to_numpy_array(image) for image in images] |
|
|
|
|
|
if is_scaled_image(images[0]) and do_rescale: |
|
|
logger.warning_once( |
|
|
"It looks like you are trying to rescale already rescaled images. If the input" |
|
|
" images have pixel values between 0 and 1, set `do_rescale=False` to avoid rescaling them again." |
|
|
) |
|
|
|
|
|
if input_data_format is None: |
|
|
|
|
|
input_data_format = infer_channel_dimension_format(images[0]) |
|
|
|
|
|
new_images = [] |
|
|
image_sizes = [get_image_size(image, channel_dim=input_data_format) for image in images] |
|
|
vision_query_lengths = [] |
|
|
|
|
|
assert crop_size["height"] == crop_size["width"] |
|
|
|
|
|
|
|
|
|
|
|
if anyres: |
|
|
anyres_global_images = copy.deepcopy(images) |
|
|
if pad_to_square: |
|
|
background_color = tuple(int(x * 255) for x in self.image_mean) |
|
|
anyres_global_images = [ |
|
|
resize_longside(copy.deepcopy(image), size["shortest_edge"], resample, input_data_format) |
|
|
for image in anyres_global_images |
|
|
] |
|
|
anyres_global_images = [ |
|
|
expand2square(image, background_color=background_color, input_data_format=input_data_format)[0] |
|
|
for image in anyres_global_images |
|
|
] |
|
|
else: |
|
|
anyres_global_images = [ |
|
|
self.resize( |
|
|
image=image, |
|
|
size={"height": size["shortest_edge"], "width": size["shortest_edge"]}, |
|
|
resample=resample, |
|
|
input_data_format=input_data_format, |
|
|
) |
|
|
for image in anyres_global_images |
|
|
] |
|
|
else: |
|
|
anyres_global_images = [None for _ in range(len(images))] |
|
|
if pad_to_square: |
|
|
background_color = tuple(int(x * 255) for x in self.image_mean) |
|
|
images = [ |
|
|
resize_longside(image, size["shortest_edge"], resample, input_data_format) for image in images |
|
|
] |
|
|
images = [ |
|
|
expand2square(image, background_color=background_color, input_data_format=input_data_format)[0] |
|
|
for image in images |
|
|
] |
|
|
|
|
|
for image, anyres_global_image, image_size in zip(images, anyres_global_images, image_sizes): |
|
|
if anyres: |
|
|
|
|
|
|
|
|
image_grids = self.get_image_grids( |
|
|
image, |
|
|
possible_resolutions, |
|
|
grid_size=crop_size["height"], |
|
|
resample=resample, |
|
|
data_format=input_data_format, |
|
|
input_data_format=input_data_format, |
|
|
) |
|
|
|
|
|
if not video: |
|
|
image_grids = [anyres_global_image] + image_grids |
|
|
else: |
|
|
image_grids = [image] |
|
|
|
|
|
pixel_values = self._preprocess( |
|
|
image_grids, |
|
|
do_resize=do_resize, |
|
|
size=size, |
|
|
resample=resample, |
|
|
do_center_crop=do_center_crop, |
|
|
crop_size=crop_size, |
|
|
do_rescale=do_rescale, |
|
|
rescale_factor=rescale_factor, |
|
|
do_normalize=do_normalize, |
|
|
image_mean=image_mean, |
|
|
image_std=image_std, |
|
|
data_format=data_format, |
|
|
input_data_format=input_data_format, |
|
|
) |
|
|
|
|
|
pixel_values = np.array(pixel_values) |
|
|
new_images.append(pixel_values) |
|
|
|
|
|
num_grids = pixel_values.shape[0] |
|
|
|
|
|
vision_query_length = determine_anyres_num_vision_patches( |
|
|
num_grids=num_grids, |
|
|
image_size=image_size, |
|
|
grid_size=crop_size["height"], |
|
|
patch_size=patch_size, |
|
|
possible_resolutions=possible_resolutions, |
|
|
anyres=anyres, |
|
|
unpad=unpad, |
|
|
num_queries_vis_abstractor=num_queries_vis_abstractor, |
|
|
num_queries_vis_abstractor_slow=num_queries_vis_abstractor_slow, |
|
|
video=video, |
|
|
first_last_frames_slow=first_last_frames_slow, |
|
|
is_first_or_last_frames=is_first_or_last_frames, |
|
|
) |
|
|
|
|
|
vision_query_lengths.append(vision_query_length) |
|
|
|
|
|
if return_dummy_image: |
|
|
vision_query_lengths = [] |
|
|
|
|
|
data = { |
|
|
"pixel_values": [torch.tensor(new_image) for new_image in new_images], |
|
|
"image_sizes": [{"width": image_size[1], "height": image_size[0]} for image_size in image_sizes], |
|
|
"vision_query_lengths": vision_query_lengths, |
|
|
} |
|
|
|
|
|
return BatchFeature(data=data) |
|
|
|
|
|
def save_pretrained( |
|
|
self, |
|
|
save_directory: Union[str, os.PathLike], |
|
|
*args, |
|
|
**kwargs, |
|
|
): |
|
|
self.register_for_auto_class() |
|
|
super().save_pretrained(save_directory, *args, **kwargs) |
|
|
|
|
|
|
|
|
class HCXVisionV2Processor(Qwen2_5_VLProcessor): |
|
|
attributes = ["image_processor", "tokenizer", "video_processor"] |
|
|
image_processor_class = "AutoImageProcessor" |
|
|
video_processor_class = "AutoVideoProcessor" |
|
|
tokenizer_class = ("GPT2Tokenizer", "GPT2TokenizerFast", "PreTrainedTokenizer", "PreTrainedTokenizerFast") |
|
|
|
|
|
def __init__(self, image_processor=None, tokenizer=None, video_processor=None, chat_template=None, **kwargs): |
|
|
self.tokenizer = tokenizer |
|
|
super().__init__(image_processor, tokenizer, video_processor, chat_template=self.tokenizer.chat_template) |
|
|
|
|
|
def save_pretrained( |
|
|
self, |
|
|
save_directory: Union[str, os.PathLike], |
|
|
*args, |
|
|
**kwargs, |
|
|
): |
|
|
self.register_for_auto_class() |
|
|
super().save_pretrained(save_directory, *args, **kwargs) |
|
|
|
|
|
def __call__( |
|
|
self, |
|
|
images: ImageInput = None, |
|
|
text: Union[TextInput, PreTokenizedInput, list[TextInput], list[PreTokenizedInput]] = None, |
|
|
videos: VideoInput = None, |
|
|
**kwargs: Unpack[Qwen2_5_VLProcessorKwargs], |
|
|
) -> BatchFeature: |
|
|
""" |
|
|
Main method to prepare for the model one or several sequences(s) and image(s). This method forwards the `text` |
|
|
and `kwargs` arguments to Qwen2TokenizerFast's [`~Qwen2TokenizerFast.__call__`] if `text` is not `None` to encode |
|
|
the text. To prepare the vision inputs, this method forwards the `vision_infos` and `kwrags` arguments to |
|
|
Qwen2VLImageProcessor's [`~Qwen2VLImageProcessor.__call__`] if `vision_infos` is not `None`. |
|
|
|
|
|
Args: |
|
|
images (`PIL.Image.Image`, `np.ndarray`, `torch.Tensor`, `list[PIL.Image.Image]`, `list[np.ndarray]`, `list[torch.Tensor]`): |
|
|
The image or batch of images to be prepared. Each image can be a PIL image, NumPy array or PyTorch |
|
|
tensor. Both channels-first and channels-last formats are supported. |
|
|
text (`str`, `list[str]`, `list[list[str]]`): |
|
|
The sequence or batch of sequences to be encoded. Each sequence can be a string or a list of strings |
|
|
(pretokenized string). If the sequences are provided as list of strings (pretokenized), you must set |
|
|
`is_split_into_words=True` (to lift the ambiguity with a batch of sequences). |
|
|
videos (`np.ndarray`, `torch.Tensor`, `list[np.ndarray]`, `list[torch.Tensor]`): |
|
|
The image or batch of videos to be prepared. Each video can be a 4D NumPy array or PyTorch |
|
|
tensor, or a nested list of 3D frames. Both channels-first and channels-last formats are supported. |
|
|
return_tensors (`str` or [`~utils.TensorType`], *optional*): |
|
|
If set, will return tensors of a particular framework. Acceptable values are: |
|
|
- `'tf'`: Return TensorFlow `tf.constant` objects. |
|
|
- `'pt'`: Return PyTorch `torch.Tensor` objects. |
|
|
- `'np'`: Return NumPy `np.ndarray` objects. |
|
|
- `'jax'`: Return JAX `jnp.ndarray` objects. |
|
|
|
|
|
Returns: |
|
|
[`BatchFeature`]: A [`BatchFeature`] with the following fields: |
|
|
|
|
|
- **input_ids** -- List of token ids to be fed to a model. Returned when `text` is not `None`. |
|
|
- **attention_mask** -- List of indices specifying which tokens should be attended to by the model (when |
|
|
`return_attention_mask=True` or if *"attention_mask"* is in `self.model_input_names` and if `text` is not |
|
|
`None`). |
|
|
- **pixel_values** -- Pixel values to be fed to a model. Returned when `images` is not `None`. |
|
|
- **pixel_values_videos** -- Pixel values of videos to be fed to a model. Returned when `videos` is not `None`. |
|
|
- **image_grid_thw** -- List of image 3D grid in LLM. Returned when `images` is not `None`. |
|
|
- **video_grid_thw** -- List of video 3D grid in LLM. Returned when `videos` is not `None`. |
|
|
""" |
|
|
output_kwargs = self._merge_kwargs( |
|
|
Qwen2_5_VLProcessorKwargs, |
|
|
tokenizer_init_kwargs=self.tokenizer.init_kwargs, |
|
|
**kwargs, |
|
|
) |
|
|
|
|
|
image_inputs = videos_inputs = {} |
|
|
if images is not None: |
|
|
image_inputs = self.image_processor(images=images, **output_kwargs["images_kwargs"]) |
|
|
image_grid_thw = image_inputs["image_grid_thw"] |
|
|
|
|
|
if videos is not None: |
|
|
videos_inputs = self.video_processor(videos=videos, **output_kwargs["videos_kwargs"]) |
|
|
video_grid_thw = videos_inputs["video_grid_thw"] |
|
|
|
|
|
if not isinstance(text, list): |
|
|
text = [text] |
|
|
|
|
|
text = text.copy() |
|
|
|
|
|
if images is not None: |
|
|
merge_length = self.image_processor.merge_size**2 |
|
|
index = 0 |
|
|
for i in range(len(text)): |
|
|
while self.image_token in text[i]: |
|
|
num_image_tokens = image_grid_thw[index].prod() // merge_length |
|
|
text[i] = text[i].replace(self.image_token, "<|placeholder|>" * num_image_tokens, 1) |
|
|
text[i] = text[i].replace( |
|
|
'{"resolution": [w, h]}', '{"resolution": ' + str(list(images[i].size)) + "}" |
|
|
) |
|
|
index += 1 |
|
|
text[i] = text[i].replace("<|placeholder|>", self.image_token) |
|
|
|
|
|
if videos is not None: |
|
|
merge_length = self.video_processor.merge_size**2 |
|
|
index = 0 |
|
|
for i in range(len(text)): |
|
|
while self.video_token in text[i]: |
|
|
num_video_tokens = video_grid_thw[index].prod() // merge_length |
|
|
text[i] = text[i].replace(self.video_token, "<|placeholder|>" * num_video_tokens, 1) |
|
|
index += 1 |
|
|
text[i] = text[i].replace("<|placeholder|>", self.video_token) |
|
|
|
|
|
return_tensors = output_kwargs["text_kwargs"].pop("return_tensors", None) |
|
|
return_mm_token_type_ids = output_kwargs["text_kwargs"].pop("return_mm_token_type_ids", False) |
|
|
text_inputs = self.tokenizer(text, **output_kwargs["text_kwargs"], return_tensors=None) |
|
|
self._check_special_mm_tokens(text, text_inputs, modalities=["image", "video"]) |
|
|
|
|
|
if return_mm_token_type_ids: |
|
|
array_ids = np.array(text_inputs["input_ids"]) |
|
|
mm_token_type_ids = np.zeros_like(text_inputs["input_ids"]) |
|
|
mm_token_type_ids[array_ids == self.image_token_id] = 1 |
|
|
text_inputs["mm_token_type_ids"] = mm_token_type_ids.tolist() |
|
|
|
|
|
return BatchFeature(data={**text_inputs, **image_inputs, **videos_inputs}, tensor_type=return_tensors) |
|
|
|