HyperCLOVAX-SEED-Think-32B / processing_hyperclovax_omni.py
bigshanedogg's picture
Upload folder using huggingface_hub
2e82d70 verified
import copy
import json
import math
import os
import PIL
from PIL import Image
from typing import Dict, List, Optional, Union
import numpy as np
import torch
from PIL import Image
import re
from torchvision.transforms.functional import to_tensor
from transformers import (
AutoTokenizer,
AutoFeatureExtractor,
AutoImageProcessor,
AutoVideoProcessor,
Qwen2_5_VLProcessor,
Qwen2AudioProcessor,
WhisperFeatureExtractor,
)
from transformers.audio_utils import AudioInput
from transformers.image_processing_utils import (
BaseImageProcessor,
BatchFeature,
get_size_dict,
)
from transformers.image_transforms import (
convert_to_rgb,
get_resize_output_image_size,
resize,
to_channel_dimension_format,
)
from transformers.image_utils import (
ImageInput,
)
from transformers.models.qwen2_5_vl.processing_qwen2_5_vl import (
Qwen2_5_VLProcessorKwargs,
)
from transformers.processing_utils import (
ProcessingKwargs, ProcessorMixin, SpecificProcessorType, Unpack,
)
from transformers.tokenization_utils_base import PreTokenizedInput, TextInput
from transformers.utils import TensorType, logging
from transformers.video_utils import VideoInput
from typing_extensions import Unpack
logger = logging.get_logger(__name__)
class HyperCLOVAXOmniProcessorKwargs(ProcessingKwargs, total=False):
_defaults = {
"audio_kwargs": {
"sample_rate": 16_000,
"chunk_unit": 80,
"min_chunk_size": 1_600,
},
"images_kwargs": {
},
"videos_kwargs": {
},
}
class HyperCLOVAXOmniProcessor(ProcessorMixin):
attributes = [
"audio_processor",
# "discrete_audio_processor",
# "discrete_image_processor",
"image_processor",
"video_processor",
"tokenizer",
]
audio_processor_class = "AutoFeatureExtractor"
# discrete_audio_processor_class = "AutoImageProcessor"
# discrete_image_processor_class = "AutoImageProcessor"
image_processor_class = "AutoImageProcessor"
tokenizer_class = ("PreTrainedTokenizer", "PreTrainedTokenizerFast")
video_processor_class = "AutoVideoProcessor"
def __init__(
self,
audio_processor: Optional[AutoFeatureExtractor] = None,
chat_template: Optional[str] = None,
image_processor: Optional[AutoImageProcessor] = None,
video_processor: Optional[AutoVideoProcessor] = None,
tokenizer: AutoTokenizer = None,
**kwargs,
):
# Prefer explicit chat_template; fall back to tokenizer's if available
if chat_template is None and hasattr(tokenizer, "chat_template"):
chat_template = tokenizer.chat_template
# Call the shared mixin directly with all declared attributes, including audio
ProcessorMixin.__init__(
self,
audio_processor,
image_processor,
video_processor,
tokenizer,
chat_template=chat_template,
)
self.modalities = list()
if self.audio_processor is not None:
self.audio_placeholder = f'{self.audio_processor.audio_token}'
self.discrete_audio_placeholder = f'{self.audio_processor.discrete_audio_token}'
# self.audio_placeholder = f'{self.audio_processor.audio_start_token}{self.audio_processor.audio_token}{self.audio_processor.audio_end_token}'
# self.discrete_audio_placeholder = f'{self.audio_processor.discrete_audio_start_token}{self.audio_processor.discrete_audio_token}{self.audio_processor.discrete_audio_end_token}'
self.audio_token = self.audio_processor.audio_token
self.audio_token_id = (
tokenizer.audio_token_id
if getattr(tokenizer, "audio_token_id", None)
else tokenizer.convert_tokens_to_ids(self.audio_processor.audio_token)
)
# self.modalities.append("audio") # skip to check mm_validation
if self.image_processor is not None:
self.image_placeholder = f'{self.image_processor.image_token}'
self.discrete_image_placeholder = f'{self.image_processor.discrete_image_token}'
# self.image_placeholder = f'{self.image_processor.image_start_token}{self.image_processor.image_token}{self.image_processor.image_end_token}'
# self.discrete_image_placeholder = f'{self.image_processor.discrete_image_start_token}{self.image_processor.discrete_image_token}{self.image_processor.discrete_image_end_token}'
self.image_token = self.image_processor.image_token
self.image_token_id = (
tokenizer.image_token_id
if getattr(tokenizer, "image_token_id", None)
else tokenizer.convert_tokens_to_ids(self.image_processor.image_token)
)
self.discrete_image_ratio_tokens = {
tuple(_discrete_image_ratio): f'<|vision_ratio_{_discrete_image_ratio[0]}:{_discrete_image_ratio[1]}|>'
for _discrete_image_ratio in self.image_processor.discrete_image_ratios
}
self.modalities.append("image")
if self.video_processor is not None:
self.video_placeholder = f'{self.video_processor.video_token}'
# self.video_placeholder = f'{self.video_processor.video_start_token}{self.video_processor.video_token}{self.video_processor.video_end_token}'
self.video_token = self.video_processor.video_token
self.video_token_id = (
tokenizer.video_token_id
if getattr(tokenizer, "video_token_id", None)
else tokenizer.convert_tokens_to_ids(self.video_processor.video_token)
)
self.modalities.append("video")
@classmethod
def from_pretrained(
cls: type[SpecificProcessorType],
pretrained_model_name_or_path: Union[str, os.PathLike],
**kwargs,
):
audio_processer_kwargs = kwargs.pop("audio_processor_kwargs", dict())
iamge_processer_kwargs = kwargs.pop("image_processor_kwargs", dict())
video_processer_kwargs = kwargs.pop("video_processor_kwargs", dict())
if "tokenizer" not in kwargs:
kwargs["tokenizer"] = AutoTokenizer.from_pretrained(
pretrained_model_name_or_path,
**kwargs,
)
audio_processor = None
try:
audio_processor = AutoFeatureExtractor.from_pretrained(
pretrained_model_name_or_path,
subfolder="audio",
**audio_processer_kwargs,
**kwargs,
)
except Exception as ex:
pass
image_processor = None
try:
image_processor = AutoImageProcessor.from_pretrained(
pretrained_model_name_or_path,
subfolder="image",
**iamge_processer_kwargs,
**kwargs,
)
except Exception as ex:
pass
video_processor = None
try:
video_processor = AutoVideoProcessor.from_pretrained(
pretrained_model_name_or_path,
subfolder="video",
**video_processer_kwargs,
**kwargs,
)
except Exception as ex:
pass
return super().from_pretrained(
pretrained_model_name_or_path=pretrained_model_name_or_path,
audio_processor=audio_processor,
image_processor=image_processor,
video_processor=video_processor,
**kwargs,
)
def save_pretrained(
self,
save_directory: Union[str, os.PathLike],
*args,
**kwargs,
):
original_attributes = list(self.__class__.attributes)
try:
audio_processor = getattr(self, "audio_processor", None)
if audio_processor is None and "audio_processor" in self.__class__.attributes:
self.__class__.attributes = [a for a in self.__class__.attributes if a != "audio_processor"]
# IMPORTANT: keep chat_template aligned with the (possibly custom) tokenizer's template.
# If we don't do this before `super().save_pretrained`, the base processor may save
# its own default Qwen template into `chat_template.jinja`, causing inconsistency.
try:
tok = getattr(self, "tokenizer", None)
ct = getattr(tok, "chat_template", None) if tok is not None else None
if isinstance(ct, str) and ct:
self.chat_template = ct
except Exception:
pass
self.register_for_auto_class()
super().save_pretrained(save_directory, *args, **kwargs)
finally:
self.__class__.attributes = original_attributes
# Persist chat_template into tokenizer_config.json so that loading the tokenizer alone
# (AutoTokenizer.from_pretrained) keeps the same template behavior.
try:
chat_template = getattr(self, "chat_template", None)
tokenizer_cfg_path = os.path.join(save_directory, "tokenizer_config.json")
if isinstance(chat_template, str) and chat_template and os.path.exists(tokenizer_cfg_path):
with open(tokenizer_cfg_path, "r", encoding="utf-8") as f:
tokenizer_cfg = json.load(f)
tokenizer_cfg["chat_template"] = chat_template
# Also persist HCX's extra special token name->token mapping (used by vLLM integration/tests).
# Some tokenizers expose this as `tokenizer.extra_special_tokens` (a dict) but do not save it by default.
extra_map = getattr(getattr(self, "tokenizer", None), "extra_special_tokens", None)
if not isinstance(extra_map, dict):
extra_map = {}
# Ensure at least the canonical multimodal tokens are present.
extra_map.setdefault("image_token", "<|IMAGE_PAD|>")
extra_map.setdefault("video_token", "<|VIDEO_PAD|>")
tokenizer_cfg["extra_special_tokens"] = extra_map
with open(tokenizer_cfg_path, "w", encoding="utf-8") as f:
json.dump(tokenizer_cfg, f, ensure_ascii=False, indent=2)
except Exception:
# Best-effort: failing to write the chat template shouldn't break saving.
pass
audio_config_path = os.path.join(save_directory, "audio_preprocessor_config.json")
if getattr(self, "audio_processor", None) is not None:
with open(audio_config_path, "w", encoding="utf-8") as f:
json.dump(self.audio_processor.to_dict(), f, ensure_ascii=False, indent=2)
elif os.path.exists(audio_config_path):
os.remove(audio_config_path)
def __call__(
self,
text: Union[TextInput, PreTokenizedInput, list[TextInput], list[PreTokenizedInput]] = None,
audios: AudioInput | None = None,
images: ImageInput | None = None,
videos: VideoInput | None = None,
**kwargs: Unpack[HyperCLOVAXOmniProcessorKwargs],
) -> BatchFeature:
"""
Main method to prepare for the model one or several sequences(s) and image(s). This method forwards the `text`
and `kwargs` arguments to Qwen2TokenizerFast's [`~Qwen2TokenizerFast.__call__`] if `text` is not `None` to encode
the text. To prepare the vision inputs, this method forwards the `vision_infos` and `kwrags` arguments to
Qwen2VLImageProcessor's [`~Qwen2VLImageProcessor.__call__`] if `vision_infos` is not `None`.
Args:
images (`PIL.Image.Image`, `np.ndarray`, `torch.Tensor`, `list[PIL.Image.Image]`, `list[np.ndarray]`, `list[torch.Tensor]`):
The image or batch of images to be prepared. Each image can be a PIL image, NumPy array or PyTorch
tensor. Both channels-first and channels-last formats are supported.
text (`str`, `list[str]`, `list[list[str]]`):
The sequence or batch of sequences to be encoded. Each sequence can be a string or a list of strings
(pretokenized string). If the sequences are provided as list of strings (pretokenized), you must set
`is_split_into_words=True` (to lift the ambiguity with a batch of sequences).
videos (`np.ndarray`, `torch.Tensor`, `list[np.ndarray]`, `list[torch.Tensor]`):
The image or batch of videos to be prepared. Each video can be a 4D NumPy array or PyTorch
tensor, or a nested list of 3D frames. Both channels-first and channels-last formats are supported.
return_tensors (`str` or [`~utils.TensorType`], *optional*):
If set, will return tensors of a particular framework. Acceptable values are:
- `'tf'`: Return TensorFlow `tf.constant` objects.
- `'pt'`: Return PyTorch `torch.Tensor` objects.
- `'np'`: Return NumPy `np.ndarray` objects.
- `'jax'`: Return JAX `jnp.ndarray` objects.
Returns:
[`BatchFeature`]: A [`BatchFeature`] with the following fields:
- **input_ids** -- List of token ids to be fed to a model. Returned when `text` is not `None`.
- **attention_mask** -- List of indices specifying which tokens should be attended to by the model (when
`return_attention_mask=True` or if *"attention_mask"* is in `self.model_input_names` and if `text` is not
`None`).
- **pixel_values** -- Pixel values to be fed to a model. Returned when `images` is not `None`.
- **pixel_values_videos** -- Pixel values of videos to be fed to a model. Returned when `videos` is not `None`.
- **image_grid_thw** -- List of image 3D grid in LLM. Returned when `images` is not `None`.
- **video_grid_thw** -- List of video 3D grid in LLM. Returned when `videos` is not `None`.
"""
output_kwargs = self._merge_kwargs(
HyperCLOVAXOmniProcessorKwargs,
tokenizer_init_kwargs=self.tokenizer.init_kwargs,
**kwargs,
)
# [Text Processing] (Placeholder Replacement)
if text is None:
pass
else:
if isinstance(text, str):
text = [text, ]
# below lines change text in-place
text = copy.deepcopy(text)
# [Audio Processing]
audio_inputs = dict()
discrete_audio_inputs = dict()
if (
audios is not None
and self.audio_processor is not None
):
if (
len(audios) > 0
and isinstance(audios[0], np.ndarray)
): # sample to batch if a single item is given
audios = [audios, ]
# continuous
audio_inputs = self._process_continuous_audio(
audios=audios,
)
# discrete
discrete_audio_inputs = self._process_discrete_audio(
audios=audios,
**output_kwargs["audio_kwargs"],
)
# [Image Processing]
image_inputs, image_grid_thw = dict(), list()
discrete_image_inputs, discrete_image_ratios = dict(), list()
if (
images is not None
and self.image_processor is not None
):
if (
len(images) > 0
and isinstance(images[0], PIL.Image.Image)
): # sample to batch if a single item is given
images = [images, ]
# continuous
image_inputs = dict()
for _images in images:
_image_inputs = self.image_processor(
images=_images,
**output_kwargs["images_kwargs"],
)
_image_grid_thw = _image_inputs["image_grid_thw"]
for _k, _v in _image_inputs.items():
if _k not in image_inputs:
image_inputs[_k] = list()
image_inputs[_k].append(_v)
image_grid_thw.append(_image_grid_thw)
for _k, _v in image_inputs.items():
if isinstance(_v[0], torch.Tensor):
image_inputs[_k] = torch.stack(_v, dim=0)
# discrete
discrete_image_inputs = self._process_discrete_images(
images=images,
)
discrete_image_ratios = discrete_image_inputs["discrete_image_ratios"]
# [Video Processing]
video_inputs, video_grid_thw = dict(), list()
if (
videos is not None
and self.video_processor is not None
):
if (
len(videos) > 0
and isinstance(videos[0], np.ndarray)
): # sample to batch if a single item is given
videos = [videos, ]
# Video feature extraction
video_inputs = dict()
video_grid_thw = list()
for _videos in videos:
_video_inputs = self.video_processor(
videos=_videos,
**output_kwargs["videos_kwargs"],
)
_video_grid_thw = _video_inputs["video_grid_thw"]
for _k, _v in _video_inputs.items():
if _k not in video_inputs:
video_inputs[_k] = list()
video_inputs[_k].append(_v)
video_grid_thw.append(_video_grid_thw)
video_inputs = {
_k: torch.stack(_v, dim=0)
if isinstance(_v[0], torch.Tensor) else _v
for _k, _v in video_inputs.items()
}
# [Expansion] - Audio
if (
text is not None
and audio_inputs
):
for _sample_idx, (_text_before, _audio_query_lengths, _discrete_audio_query_lengths) in enumerate(zip(
text, audio_inputs["audio_query_lengths"], discrete_audio_inputs["discrete_audio_query_lengths"],
)):
_find_iters = list(re.finditer(re.escape(self.audio_placeholder), _text_before, re.DOTALL))
if len(_find_iters) > 0:
_text_after = ""
_prev_end_idx = 0
for _idx, _continuous_audio_match in enumerate(_find_iters):
_cur_start_idx = _continuous_audio_match.start()
_inplace_str = self.get_audio_token_replacement(
audio_query_length=_audio_query_lengths[_idx],
include_boundary_tokens=True,
tokenize=False,
)
_discrete_audio_match = re.search(re.escape(self.discrete_audio_placeholder), _text_before[_prev_end_idx:_continuous_audio_match.start()])
if _discrete_audio_match:
_cur_start_idx = _discrete_audio_match.start()
_discrete_inplace_str = self.get_discrete_audio_token_replacement(
discrete_audio_query_length=_discrete_audio_query_lengths[_idx],
include_boundary_tokens=True,
tokenize=False,
)
_inplace_str = f'{_discrete_inplace_str}{_inplace_str}'
_text_after += _text_before[_prev_end_idx:_cur_start_idx]
_text_after += _inplace_str
_prev_end_idx = _continuous_audio_match.end()
_text_after += _text_before[_prev_end_idx:]
text[_sample_idx] = _text_after
# [Expansion] - Image
if (
text is not None
and image_inputs
):
for _sample_idx, (_text_before, _image_grid_thw, _discrete_image_ratios) in enumerate(zip(
text, image_inputs["image_grid_thw"], discrete_image_inputs["discrete_image_ratios"],
)):
_find_iters = list(re.finditer(re.escape(self.image_placeholder), _text_before, re.DOTALL))
if len(_find_iters) > 0:
_text_after = ""
_prev_end_idx = 0
for _idx, _continuous_image_match in enumerate(_find_iters):
_cur_start_idx = _continuous_image_match.start()
_inplace_str = self.get_image_token_replacement(
image_grid_thw=_image_grid_thw[_idx],
include_boundary_tokens=True,
tokenize=False,
)
_discrete_image_match = re.search(re.escape(self.discrete_image_placeholder), _text_before[_prev_end_idx:_continuous_image_match.start()])
if _discrete_image_match:
_cur_start_idx = _discrete_image_match.start()
_discrete_inplace_str = self.get_discrete_image_token_replacement(
discrete_image_ratio=_discrete_image_ratios[_idx],
include_boundary_tokens=True,
tokenize=False,
)
_inplace_str = f'{_discrete_inplace_str}{_inplace_str}'
_text_after += _text_before[_prev_end_idx:_cur_start_idx]
_text_after += _inplace_str
_prev_end_idx = _continuous_image_match.end()
_text_after += _text_before[_prev_end_idx:]
text[_sample_idx] = _text_after
# [Expansion] - Video
if (
text is not None
and video_inputs
):
for _sample_idx, (_text_before, _video_grid_thw) in enumerate(zip(
text, video_inputs["video_grid_thw"]
)):
_find_iters = list(re.finditer(re.escape(self.video_placeholder), _text_before, re.DOTALL))
if len(_find_iters) > 0:
_text_after = ""
_prev_end_idx = 0
for _idx, _continuous_video_match in enumerate(_find_iters):
_cur_start_idx = _continuous_video_match.start()
_inplace_str = self.get_video_token_replacement(
video_grid_thw=_video_grid_thw[_idx],
include_boundary_tokens=True,
tokenize=False,
)
_text_after += _text_before[_prev_end_idx:_cur_start_idx]
_text_after += _inplace_str
_prev_end_idx = _continuous_video_match.end()
_text_after += _text_before[_prev_end_idx:]
text[_sample_idx] = _text_after
return_tensors = output_kwargs["text_kwargs"].pop("return_tensors", None)
return_mm_token_type_ids = output_kwargs["text_kwargs"].pop("return_mm_token_type_ids", False)
text_inputs = dict()
if text is not None:
text_inputs = self.tokenizer(text, **output_kwargs["text_kwargs"], return_tensors=None)
self._check_special_mm_tokens(
text,
text_inputs,
modalities=self.modalities,
)
if return_mm_token_type_ids:
array_ids = np.array(text_inputs["input_ids"])
mm_token_type_ids = np.zeros_like(text_inputs["input_ids"])
mm_token_type_ids[array_ids == self.image_processor.image_token_id] = 1
if text_inputs:
text_inputs["mm_token_type_ids"] = mm_token_type_ids.tolist()
data = {
**text_inputs,
**image_inputs,
**video_inputs,
**discrete_image_inputs,
**audio_inputs,
**discrete_audio_inputs,
}
_tensorable_data, _untensorable_data = dict(), dict()
for _k, _v in data.items():
if _k in [
"discrete_image_ratios",
]:
_untensorable_data[_k] = _v
else:
_tensorable_data[_k] = _v
model_inputs = BatchFeature(data=_tensorable_data, tensor_type=return_tensors)
model_inputs.update(_untensorable_data)
return model_inputs
def _process_continuous_audio(
self,
audios: Union[List[np.ndarray], List[List[np.ndarray]]],
sample_rate: int = 16_000,
chunk_unit: int = 80,
min_chunk_size: int = 1_600,
return_tensors: Optional[bool] = None,
):
"""Continuous Audio Preprocessing"""
if (
len(audios) > 0
and isinstance(audios[0], np.ndarray)
):
audios = [audios, ]
audio_values, audio_masks, audio_query_lengths = list(), list(), list()
for _audios in audios:
_audio_values, _audio_masks, _audio_query_lengths = list(), list(), list()
if len(_audios) == 0:
_audio_values = torch.zeros(0, 128, 3000)
_audio_masks = torch.zeros(0, 3000)
_audio_query_lengths = [0, ]
else:
for _audio in _audios:
chunks = []
for i in range(0, len(_audio), 30 * self.audio_processor.sampling_rate):
chunks.append(_audio[i : i + 30 * self.audio_processor.sampling_rate])
num_of_chunks = len(chunks)
preprocess_results = self.audio_processor(
chunks,
sampling_rate=self.audio_processor.sampling_rate,
return_attention_mask=True,
padding="max_length"
)
_audio_value = preprocess_results.input_features
_audio_mask = preprocess_results.attention_mask
if isinstance(_audio_value, list):
_audio_value = np.array(_audio_value)
if isinstance(_audio_mask, list):
_audio_mask = np.array(_audio_mask)
input_lengths = int(_audio_mask.sum())
input_lengths = (input_lengths - 1) // 2 + 1
output_lengths = (input_lengths - 2) // 2 + 1
_audio_values.append(torch.Tensor(_audio_value))
_audio_masks.append(torch.Tensor(_audio_mask))
_audio_query_lengths.append(output_lengths)
_audio_values = torch.cat(_audio_values, dim=0)
_audio_masks = torch.cat(_audio_masks, dim=0)
_audio_query_lengths = torch.tensor(_audio_query_lengths)
audio_values.append(_audio_values)
audio_masks.append(_audio_masks)
audio_query_lengths.append(_audio_query_lengths)
audio_values = torch.stack(audio_values, dim=0)
audio_masks = torch.stack(audio_masks, dim=0)
audio_query_lengths = torch.stack(audio_query_lengths, dim=0)
return {
"audio_values": audio_values,
"audio_masks": audio_masks,
"audio_query_lengths": audio_query_lengths,
}
def _process_discrete_audio(
self,
audios: Union[List[np.ndarray], List[List[np.ndarray]]],
sample_rate: int = 16_000,
chunk_unit: int = 80,
min_chunk_size: int = 1_600,
return_tensors: Optional[bool] = None,
):
"""Discrete Audio Preprocessing"""
if (
len(audios) > 0
and isinstance(audios[0], np.ndarray)
):
audios = [audios, ]
discrete_audio_values, discrete_audio_query_lengths = list(), list()
for _audios in audios:
_discrete_audio_values, _discrete_audio_query_lengths = list(), list()
for _audio in _audios:
audio_length = len(_audio)
max_audio_length = 600 * sample_rate
audio_duration_sec = audio_length / sample_rate
if audio_length < min_chunk_size:
raise ValueError(f"Discrete audio too short: {audio_length}")
if np.isnan(_audio).any() or np.isinf(_audio).any():
raise ValueError("Discrete audio contains NaN/Inf")
if audio_length > max_audio_length:
raise ValueError(f"Discrete audio too long: {audio_length} samples = ({audio_duration_sec:.2f}s > 600s)")
audio_min, audio_max = _audio.min().item(), _audio.max().item()
if audio_min < -100.0 or audio_max > 100.0:
raise ValueError(f"Discrete audio values out of range: min {audio_min}, max {audio_max}")
_audio_query_length = None
if audio_length > chunk_unit * sample_rate:
total_code_len = 0
chunk_size = chunk_unit * sample_rate
for start in range(0, audio_length, chunk_size):
end = min(start + chunk_size, audio_length)
if end < audio_length and audio_length - end < min_chunk_size:
end = audio_length
chunk_len = end - start
mel_len = chunk_len // 160
after_conv1 = (mel_len + 2 * 1 - 1 * (3 - 1) - 1) // 2 + 1
code_len = (after_conv1 + 2 * 1 - 1 * (3 - 1) - 1) // 2 + 1
total_code_len += code_len
if end >= audio_length:
break
_audio_query_length = total_code_len
else:
mel_len = audio_length // 160
after_conv1 = (mel_len + 2 * 1 - 1 * (3 - 1) - 1) // 2 + 1
code_len = (after_conv1 + 2 * 1 - 1 * (3 - 1) - 1) // 2 + 1
_audio_query_length = code_len
_discrete_audio_values.append(torch.tensor(_audio))
_discrete_audio_query_lengths.append(_audio_query_length)
_discrete_audio_values = _discrete_audio_values = torch.stack(_discrete_audio_values, dim=0)
_discrete_audio_query_lengths = torch.tensor(_discrete_audio_query_lengths)
discrete_audio_values.append(_discrete_audio_values)
discrete_audio_query_lengths.append(_discrete_audio_query_lengths)
discrete_audio_values = torch.stack(discrete_audio_values, dim=0)
discrete_audio_query_lengths = torch.stack(discrete_audio_query_lengths, dim=0)
return {
"discrete_audio_values": discrete_audio_values,
"discrete_audio_query_lengths": discrete_audio_query_lengths,
}
def _process_discrete_images(
self,
images: Union[List[PIL.Image.Image], List[List[PIL.Image.Image]]],
return_tensors: Optional[bool] = None,
):
"""Discrete Image Preprocessing"""
if (
len(images) > 0
and isinstance(images[0], PIL.Image.Image)
):
images = [images, ]
discrete_pixel_values, image_ratios = list(), list()
for _images in images:
_discrete_pixel_values, _image_ratios = list(), list()
for _image in _images:
w, h = _image.size
_img_ratio = self._find_best_ratio_token([h, w])
_discrete_pixel_value = _image.resize((384, 384), Image.BICUBIC)
_discrete_pixel_tensor = to_tensor(_discrete_pixel_value)
_discrete_pixel_tensor = _discrete_pixel_tensor.squeeze(dim=0)
_discrete_pixel_values.append(_discrete_pixel_tensor)
_img_ratio = torch.tensor(_img_ratio)
_image_ratios.append(_img_ratio)
_discrete_pixel_values = torch.stack(_discrete_pixel_values, dim=0)
_image_ratios = torch.stack(_image_ratios, dim=0)
discrete_pixel_values.append(_discrete_pixel_values)
image_ratios.append(_image_ratios)
discrete_pixel_values = torch.stack(discrete_pixel_values, dim=0)
image_ratios = torch.stack(image_ratios, dim=0)
return {
"discrete_pixel_values": discrete_pixel_values,
"discrete_image_ratios": image_ratios,
}
def _find_best_ratio_token(
self,
original_size: List[int],
):
"""Find the best ratio token based on original_size"""
base_ratios = list(self.discrete_image_ratio_tokens.keys())
vision_aspect_ratios = [r for ratio in base_ratios for r in [ratio, ratio[::-1]]][1:] # 13 ratios total
if not isinstance(original_size, list) or len(original_size) != 2:
return self.discrete_image_ratio_tokens[(1, 1)]
h, w = original_size
if h == 0 or w == 0:
return self.discrete_image_ratio_tokens[(1, 1)]
ratios = [i / j for i, j in vision_aspect_ratios]
best_size_idx = np.argmin([abs(w / h - r) for r in ratios])
i, j = vision_aspect_ratios[best_size_idx]
return (i, j)
def get_num_audio_tokens(
self,
audio_masks: torch.Tensor,
**kwargs: Unpack[HyperCLOVAXOmniProcessorKwargs],
) -> int:
kwargs = self._merge_kwargs(
HyperCLOVAXOmniProcessorKwargs,
tokenizer_init_kwargs=self.tokenizer.init_kwargs,
**kwargs,
)
def _compute_num_audio_tokens(audio_mask: torch.Tensor,):
"""
audio_mask: shape (N, )
"""
input_length = (int(audio_mask.sum()) - 1) // 2 + 1
num_audio_tokens = (input_length - 2) // 2 + 1
return num_audio_tokens
if len(audio_masks.shape) == 1:
num_audio_tokens = _compute_num_audio_tokens(audio_mask=audio_masks)
else: # len(audio_masks.shape) == 2
num_audio_tokens = sum([
_compute_num_audio_tokens(audio_mask=_audio_mask)
for _audio_mask in audio_masks
])
# num_audio_tokens += 2 # <|audio_start|>, <|audio_end|>
return num_audio_tokens
def get_num_discrete_audio_tokens(
self,
discrete_audio_values: Optional[torch.Tensor] = None,
**kwargs: Unpack[HyperCLOVAXOmniProcessorKwargs],
) -> int:
kwargs = self._merge_kwargs(
HyperCLOVAXOmniProcessorKwargs,
tokenizer_init_kwargs=self.tokenizer.init_kwargs,
**kwargs,
)
audio_length = len(discrete_audio_values)
num_audio_tokens = 0
chunk_size = kwargs["audio_kwargs"].get("chunk_unit", 80) * kwargs["audio_kwargs"].get("sample_rate", 16_000)
for _start in range(0, audio_length, chunk_size):
_end = min(_start + chunk_size, audio_length)
_chunked_length = _end - _start
_num_mel_frames = _chunked_length // 160
_num_mel_frames_conv1 = (_num_mel_frames + 2 * 1 - 1 * (3 - 1) - 1) // 2 + 1
_num_audio_tokens = (_num_mel_frames_conv1 + 2 * 1 - 1 * (3 - 1) - 1) // 2 + 1
num_audio_tokens += _num_audio_tokens
# num_audio_tokens += 2 # <|discrete_audio_start|>, <|discrete_audio_end|>
return num_audio_tokens
def get_num_image_tokens(
self,
image_width: Optional[int] = None,
image_height: Optional[int] = None,
pixel_values: Optional[torch.Tensor] = None,
**kwargs: Unpack[HyperCLOVAXOmniProcessorKwargs],
) -> int:
kwargs = self._merge_kwargs(
HyperCLOVAXOmniProcessorKwargs,
tokenizer_init_kwargs=self.tokenizer.init_kwargs,
**kwargs,
)
image_processor_merge_size = 2
if self.image_processor is not None:
image_processor_merge_size = getattr(self.image_processor, "merge_size", 2)
num_image_tokens = None
if pixel_values is None:
images_kwargs = Qwen2_5_VLProcessorKwargs._defaults.get("images_kwargs", {})
images_kwargs.update(kwargs["images_kwargs"])
num_image_patches = self.image_processor.get_number_of_image_patches(
image_height, image_width, images_kwargs,
)
num_image_tokens = num_image_patches // (image_processor_merge_size ** 2)
elif len(pixel_values.shape) == 2:
num_image_tokens = pixel_values.shape[0] // (image_processor_merge_size ** 2)
else: # len(pixel_values_videos.shape) == 3
num_image_tokens = sum([
_pixel_values.shape[0] // (image_processor_merge_size ** 2)
for _pixel_values in pixel_values
])
# num_image_tokens += 2 # <|image_start|>, <|image_end|>
return num_image_tokens
def get_num_discrete_image_tokens(
self,
**kwargs: Unpack[HyperCLOVAXOmniProcessorKwargs],
) -> int:
kwargs = self._merge_kwargs(
HyperCLOVAXOmniProcessorKwargs,
tokenizer_init_kwargs=self.tokenizer.init_kwargs,
**kwargs,
)
discrete_token_size = self.image_processor.discrete_token_size
num_image_tokens = discrete_token_size ** 2 + discrete_token_size # <|vision_eol|>
# num_image_tokens += 3 # <|discrete_image_start|>, <|vision_eof|>, <|discrete_image_end|>
return num_image_tokens
def get_num_video_tokens(
self,
image_width: Optional[int] = None,
image_height: Optional[int] = None,
num_frames: Optional[int] = None,
pixel_values_videos: Optional[torch.Tensor] = None,
**kwargs: Unpack[HyperCLOVAXOmniProcessorKwargs],
) -> int:
kwargs = self._merge_kwargs(
HyperCLOVAXOmniProcessorKwargs,
tokenizer_init_kwargs=self.tokenizer.init_kwargs,
**kwargs,
)
video_processor_merge_size = 2
if self.video_processor is not None:
video_processor_merge_size = getattr(self.video_processor, "merge_size", 2)
if not pixel_values_videos:
videos_kwargs = Qwen2_5_VLProcessorKwargs._defaults.get("videos_kwargs", {})
videos_kwargs.update(kwargs["videos_kwargs"])
num_video_patches = self.video_processor.get_num_of_video_patches(
num_frames, image_height, image_width, videos_kwargs,
)
num_video_tokens = num_video_patches // (video_processor_merge_size ** 2)
elif len(pixel_values_videos.shape) == 2:
num_video_tokens = pixel_values_videos.shape[0] // (video_processor_merge_size ** 2)
else: # len(pixel_values_videos.shape) == 3
num_video_tokens = sum([
_pixel_values_videos.shape[0] // (video_processor_merge_size ** 2)
for _pixel_values_videos in pixel_values_videos
])
# num_video_tokens += 2 # <|video_start|>, <|video_end|>
return num_video_tokens
def get_audio_token_replacement(
self,
audio_query_length: int,
include_boundary_tokens: Optional[bool] = True,
tokenize: Optional[bool] = False,
):
replacement = self.audio_processor.audio_token * int(audio_query_length)
if include_boundary_tokens:
replacement = f'{self.audio_processor.audio_start_token}{replacement}{self.audio_processor.audio_end_token}'
if tokenize:
replacement = self.tokenizer.encode(replacement)
return replacement
def get_discrete_audio_token_replacement(
self,
discrete_audio_query_length: Optional[int] = None,
include_boundary_tokens: Optional[bool] = True,
tokenize: Optional[bool] = False,
):
replacement = self.audio_processor.discrete_audio_token * int(discrete_audio_query_length)
if include_boundary_tokens:
replacement = f'{self.audio_processor.discrete_audio_start_token}{replacement}{self.audio_processor.discrete_audio_end_token}'
if tokenize:
replacement = self.tokenizer.encode(replacement)
return replacement
def get_image_token_replacement(
self,
image_grid_thw: List[int],
include_boundary_tokens: Optional[bool] = True,
tokenize: Optional[bool] = False,
):
merge_length = self.image_processor.merge_size ** 2
discrete_token_size = self.image_processor.discrete_token_size
_num_image_tokens = image_grid_thw.prod() // merge_length
replacement = self.image_processor.image_token * int(_num_image_tokens)
if include_boundary_tokens:
replacement = f'{self.image_processor.image_start_token}{replacement}{self.image_processor.image_end_token}'
if tokenize:
replacement = self.tokenizer.encode(replacement)
return replacement
def get_discrete_image_token_replacement(
self,
discrete_image_ratio: Optional[List[int]] = None,
include_boundary_tokens: Optional[bool] = True,
tokenize: Optional[bool] = False,
):
discrete_token_size = self.image_processor.discrete_token_size
_row_str = f'{(self.image_processor.discrete_image_token * discrete_token_size)}{self.image_processor.vision_eol_token}'
_discrete_image_ratio_token = self.discrete_image_ratio_tokens[(discrete_image_ratio[0], discrete_image_ratio[0])]
replacement = f'{_discrete_image_ratio_token}{(_row_str * discrete_token_size)}'
if include_boundary_tokens:
replacement = f'{self.image_processor.discrete_image_start_token}{replacement}{self.image_processor.discrete_image_end_token}'
if tokenize:
replacement = self.tokenizer.encode(replacement)
return replacement
def get_video_token_replacement(
self,
video_grid_thw: List[int],
include_boundary_tokens: Optional[bool] = True,
tokenize: Optional[bool] = False,
):
merge_length = self.video_processor.merge_size ** 2
_num_video_tokens = video_grid_thw.prod() // merge_length
replacement = self.video_processor.video_token * int(_num_video_tokens)
if include_boundary_tokens:
replacement = f'{self.video_processor.video_start_token}{replacement}{self.video_processor.video_end_token}'
if tokenize:
replacement = self.tokenizer.encode(replacement)
return replacement