SimpleSeg-Qwen2.5-VL / processing_opencua.py
sthui's picture
Upload folder using huggingface_hub
474caea verified
from transformers.models.qwen2_5_vl.processing_qwen2_5_vl import Qwen2_5_VLProcessor
from transformers.activations import ACT2FN
from transformers.cache_utils import Cache
from transformers.configuration_utils import PretrainedConfig
from transformers.feature_extraction_utils import BatchFeature
from transformers.image_utils import ImageInput
from transformers.modeling_flash_attention_utils import is_flash_attn_available
from transformers.modeling_layers import GradientCheckpointingLayer
from transformers.processing_utils import ImagesKwargs, MultiModalData, ProcessingKwargs, ProcessorMixin, Unpack, VideosKwargs
from transformers.tokenization_utils_base import PreTokenizedInput, TextInput
from transformers.utils import is_torchdynamo_compiling, logging
from transformers.video_utils import VideoInput
from transformers.models.qwen2_5_vl.processing_qwen2_5_vl import Qwen2_5_VLProcessorKwargs
import torch
import numpy as np
from typing import Union, Optional
# from typing import Union, Optional, TypedDict
from PIL import Image
if is_flash_attn_available():
pass
logger = logging.get_logger(__name__)
# class Qwen2_5_VLVideosProcessorKwargs(VideosKwargs, total=False):
# fps: Union[list[float], float]
# class TokenizerChatTemplateKwargs(TypedDict, total=False):
# """
# Keyword arguments for tokenizer's `apply_chat_template`, when it is called from within a processor.
# tools (`list[Dict]`, *optional*):
# A list of tools (callable functions) that will be accessible to the model. If the template does not
# support function calling, this argument will have no effect. Each tool should be passed as a JSON Schema,
# giving the name, description and argument types for the tool. See our
# [chat templating guide](https://huggingface.co/docs/transformers/main/en/chat_templating#automated-function-conversion-for-tool-use)
# for more information.
# documents (`list[dict[str, str]]`, *optional*):
# A list of dicts representing documents that will be accessible to the model if it is performing RAG
# (retrieval-augmented generation). If the template does not support RAG, this argument will have no
# effect. We recommend that each document should be a dict containing "title" and "text" keys. Please
# see the RAG section of the [chat templating guide](https://huggingface.co/docs/transformers/main/en/chat_templating#arguments-for-RAG)
# for examples of passing documents with chat templates.
# add_generation_prompt (bool, *optional*):
# If this is set, a prompt with the token(s) that indicate
# the start of an assistant message will be appended to the formatted output. This is useful when you want to generate a response from the model.
# Note that this argument will be passed to the chat template, and so it must be supported in the
# template for this argument to have any effect.
# continue_final_message (bool, *optional*):
# If this is set, the chat will be formatted so that the final
# message in the chat is open-ended, without any EOS tokens. The model will continue this message
# rather than starting a new one. This allows you to "prefill" part of
# the model's response for it. Cannot be used at the same time as `add_generation_prompt`.
# return_assistant_tokens_mask (`bool`, defaults to `False`):
# Whether to return a mask of the assistant generated tokens. For tokens generated by the assistant,
# the mask will contain 1. For user and system tokens, the mask will contain 0.
# This functionality is only available for chat templates that support it via the `{% generation %}` keyword.
# """
# tools: Optional[list[dict]] = None
# documents: Optional[list[dict[str, str]]] = None
# add_generation_prompt: Optional[bool] = False
# continue_final_message: Optional[bool] = False
# return_assistant_tokens_mask: Optional[bool] = False
# class ChatTemplateLoadKwargs(TypedDict, total=False):
# """
# Keyword arguments used to load multimodal data in processor chat templates.
# num_frames (`int`, *optional*):
# Number of frames to sample uniformly. If not passed, the whole video is loaded.
# load_audio_from_video (`bool`, *optional*):
# Whether to use the audio track of input video. If `True` the audio track will be loaded and passed to the
# processor. This flag has no effect if the model doesn't support audio modality.
# """
# sampling_rate: Optional[int] = 16_000
# load_audio_from_video: Optional[bool] = False
# class ProcessorChatTemplateKwargs(ChatTemplateLoadKwargs, TokenizerChatTemplateKwargs, total=False):
# """
# Keyword arguments for processor's `apply_chat_template`.
# tokenize (`bool`, *optional*, defaults to `False`):
# Whether to tokenize the output or not.
# return_dict (`bool`, defaults to `False`):
# Whether to return a dictionary with named outputs. Has no effect if tokenize is `False`.
# """
# tokenize: Optional[bool] = False
# return_dict: Optional[bool] = False
# class AllKwargsForChatTemplate(TypedDict, total=False):
# processor_kwargs: ProcessingKwargs
# mm_load_kwargs: ChatTemplateLoadKwargs
# template_kwargs: ProcessorChatTemplateKwargs
# class Qwen2_5_VLImagesKwargs(ImagesKwargs):
# min_pixels: Optional[int]
# max_pixels: Optional[int]
# patch_size: Optional[int]
# temporal_patch_size: Optional[int]
# merge_size: Optional[int]
# class Qwen2_5_VLProcessorKwargs(ProcessingKwargs, total=False):
# images_kwargs: Qwen2_5_VLImagesKwargs
# videos_kwargs: Qwen2_5_VLVideosProcessorKwargs
# _defaults = {
# "text_kwargs": {
# "padding": False,
# "return_mm_token_type_ids": False,
# },
# }
class OpenCUAProcessor(Qwen2_5_VLProcessor):
attributes = ["image_processor", "tokenizer", "video_processor"]
image_processor_class = "AutoImageProcessor"
video_processor_class = "AutoVideoProcessor"
tokenizer_class = "AutoTokenizer"
def __init__(self,
image_processor: None,
tokenizer: None,
video_processor: None,
**kwargs,
):
super().__init__(image_processor, tokenizer, video_processor, **kwargs)
self.image_token = "<|media_placeholder|>" if not hasattr(tokenizer, "image_token") else tokenizer.image_token
self.video_token = "<|media_placeholder|>" if not hasattr(tokenizer, "video_token") else tokenizer.video_token
self.image_token_id = (
tokenizer.image_token_id
if getattr(tokenizer, "image_token_id", None)
else tokenizer.convert_tokens_to_ids(self.image_token)
)
self.video_token_id = (
tokenizer.video_token_id
if getattr(tokenizer, "video_token_id", None)
else tokenizer.convert_tokens_to_ids(self.video_token)
)
self.chat_template = self.tokenizer.chat_template
self.bos_token = self.tokenizer.bos_token
self.eos_token = self.tokenizer.eos_token
self.pad_token = self.tokenizer.pad_token
self.unk_token = self.tokenizer.unk_token
def __call__(
self,
images: Optional[ImageInput] = None,
text: Union[TextInput, PreTokenizedInput, list[TextInput], list[PreTokenizedInput]] = None,
videos: Optional[VideoInput] = None,
**kwargs: Unpack[Qwen2_5_VLProcessorKwargs],
) -> BatchFeature:
"""
Main method to prepare for the model one or several sequences(s) and image(s). This method forwards the `text`
and `kwargs` arguments to Qwen2TokenizerFast's [`~Qwen2TokenizerFast.__call__`] if `text` is not `None` to encode
the text. To prepare the vision inputs, this method forwards the `vision_infos` and `kwargs` arguments to
Qwen2VLImageProcessor's [`~Qwen2VLImageProcessor.__call__`] if `vision_infos` is not `None`.
Args:
images (`PIL.Image.Image`, `np.ndarray`, `torch.Tensor`, `list[PIL.Image.Image]`, `list[np.ndarray]`, `list[torch.Tensor]`):
The image or batch of images to be prepared. Each image can be a PIL image, NumPy array or PyTorch
tensor. Both channels-first and channels-last formats are supported.
text (`str`, `list[str]`, `list[list[str]]`):
The sequence or batch of sequences to be encoded. Each sequence can be a string or a list of strings
(pretokenized string). If the sequences are provided as list of strings (pretokenized), you must set
`is_split_into_words=True` (to lift the ambiguity with a batch of sequences).
videos (`np.ndarray`, `torch.Tensor`, `list[np.ndarray]`, `list[torch.Tensor]`):
The image or batch of videos to be prepared. Each video can be a 4D NumPy array or PyTorch
tensor, or a nested list of 3D frames. Both channels-first and channels-last formats are supported.
return_tensors (`str` or [`~utils.TensorType`], *optional*):
If set, will return tensors of a particular framework. Acceptable values are:
- `'pt'`: Return PyTorch `torch.Tensor` objects.
- `'np'`: Return NumPy `np.ndarray` objects.
Returns:
[`BatchFeature`]: A [`BatchFeature`] with the following fields:
- **input_ids** -- List of token ids to be fed to a model. Returned when `text` is not `None`.
- **attention_mask** -- List of indices specifying which tokens should be attended to by the model (when
`return_attention_mask=True` or if *"attention_mask"* is in `self.model_input_names` and if `text` is not
`None`).
- **pixel_values** -- Pixel values to be fed to a model. Returned when `images` is not `None`.
- **pixel_values_videos** -- Pixel values of videos to be fed to a model. Returned when `videos` is not `None`.
- **image_grid_thw** -- List of image 3D grid in LLM. Returned when `images` is not `None`.
- **video_grid_thw** -- List of video 3D grid in LLM. Returned when `videos` is not `None`.
- **second_per_grid_ts** -- List of video seconds per time grid. Returned when `videos` is not `None`.
"""
output_kwargs = self._merge_kwargs(
Qwen2_5_VLProcessorKwargs,
tokenizer_init_kwargs=self.tokenizer.init_kwargs,
**kwargs,
)
image_inputs = videos_inputs = {}
if images is not None:
image_inputs = self.image_processor(images=images, **output_kwargs["images_kwargs"])
image_grid_thw = image_inputs["image_grid_thw"]
if videos is not None:
fps = output_kwargs["videos_kwargs"].get("fps", 2.0)
videos_inputs = self.video_processor(videos=videos, **output_kwargs["videos_kwargs"])
video_grid_thw = videos_inputs["video_grid_thw"]
if isinstance(fps, (int, float)):
second_per_grid_ts = [self.video_processor.temporal_patch_size / fps] * len(video_grid_thw)
elif hasattr(fps, "__len__") and len(fps) == len(video_grid_thw):
second_per_grid_ts = [self.video_processor.temporal_patch_size / tmp for tmp in fps]
else:
raise ValueError(
f"The length of fps ({len(fps) if hasattr(fps, '__len__') else fps}) must be equal to the length of video_grid_thw ({len(video_grid_thw)}) or fps should be a single number."
)
videos_inputs.update({"second_per_grid_ts": second_per_grid_ts})
if not isinstance(text, list):
text = [text]
text = text.copy() # below lines change text in-place
if images is not None:
merge_length = self.image_processor.merge_size**2
index = 0
for i in range(len(text)):
while self.image_token in text[i]:
num_image_tokens = image_grid_thw[index].prod() // merge_length
text[i] = text[i].replace(self.image_token, '<|temp_placeholder|>' * num_image_tokens, 1)
index += 1
text[i] = text[i].replace('<|temp_placeholder|>', self.image_token)
if videos is not None:
merge_length = self.video_processor.merge_size**2
index = 0
for i in range(len(text)):
while self.video_token in text[i]:
num_video_tokens = video_grid_thw[index].prod() // merge_length
text[i] = text[i].replace(self.video_token, '<|temp_placeholder|>' * num_video_tokens, 1)
index += 1
text[i] = text[i].replace('<|temp_placeholder|>', self.video_token)
return_tensors = output_kwargs["text_kwargs"].pop("return_tensors", None)
return_mm_token_type_ids = output_kwargs["text_kwargs"].pop("return_mm_token_type_ids", None)
# from IPython import embed; embed()
text_inputs = self.tokenizer(text, **output_kwargs["text_kwargs"])
self._check_special_mm_tokens(text, text_inputs, modalities=["image", "video"])
if return_mm_token_type_ids:
array_ids = np.array(text_inputs["input_ids"])
mm_token_type_ids = np.zeros_like(text_inputs["input_ids"])
mm_token_type_ids[array_ids == self.image_token_id] = 1
text_inputs["mm_token_type_ids"] = mm_token_type_ids.tolist()
return BatchFeature(data={**text_inputs, **image_inputs, **videos_inputs}, tensor_type=return_tensors)
# @property
# def model_input_names(self):
# tokenizer_input_names = self.tokenizer.model_input_names
# image_processor_input_names = self.image_processor.model_input_names
# names_from_processor = list(dict.fromkeys(tokenizer_input_names + image_processor_input_names))
# return names_from_processor + ["second_per_grid_ts"]
__all__ = ["OpenCUAProcessor"]