import copy import json import math import os import PIL from PIL import Image from typing import Dict, List, Optional, Union import numpy as np import torch from PIL import Image import re from torchvision.transforms.functional import to_tensor from transformers import ( AutoTokenizer, AutoFeatureExtractor, AutoImageProcessor, AutoVideoProcessor, Qwen2_5_VLProcessor, Qwen2AudioProcessor, WhisperFeatureExtractor, ) from transformers.audio_utils import AudioInput from transformers.image_processing_utils import ( BaseImageProcessor, BatchFeature, get_size_dict, ) from transformers.image_transforms import ( convert_to_rgb, get_resize_output_image_size, resize, to_channel_dimension_format, ) from transformers.image_utils import ( ImageInput, ) from transformers.models.qwen2_5_vl.processing_qwen2_5_vl import ( Qwen2_5_VLProcessorKwargs, ) from transformers.processing_utils import ( ProcessingKwargs, ProcessorMixin, SpecificProcessorType, Unpack, ) from transformers.tokenization_utils_base import PreTokenizedInput, TextInput from transformers.utils import TensorType, logging from transformers.video_utils import VideoInput from typing_extensions import Unpack logger = logging.get_logger(__name__) class HyperCLOVAXOmniProcessorKwargs(ProcessingKwargs, total=False): _defaults = { "audio_kwargs": { "sample_rate": 16_000, "chunk_unit": 80, "min_chunk_size": 1_600, }, "images_kwargs": { }, "videos_kwargs": { }, } class HyperCLOVAXOmniProcessor(ProcessorMixin): attributes = [ "audio_processor", # "discrete_audio_processor", # "discrete_image_processor", "image_processor", "video_processor", "tokenizer", ] audio_processor_class = "AutoFeatureExtractor" # discrete_audio_processor_class = "AutoImageProcessor" # discrete_image_processor_class = "AutoImageProcessor" image_processor_class = "AutoImageProcessor" tokenizer_class = ("PreTrainedTokenizer", "PreTrainedTokenizerFast") video_processor_class = "AutoVideoProcessor" def __init__( self, audio_processor: Optional[AutoFeatureExtractor] = None, chat_template: Optional[str] = None, image_processor: Optional[AutoImageProcessor] = None, video_processor: Optional[AutoVideoProcessor] = None, tokenizer: AutoTokenizer = None, **kwargs, ): # Prefer explicit chat_template; fall back to tokenizer's if available if chat_template is None and hasattr(tokenizer, "chat_template"): chat_template = tokenizer.chat_template # Call the shared mixin directly with all declared attributes, including audio ProcessorMixin.__init__( self, audio_processor, image_processor, video_processor, tokenizer, chat_template=chat_template, ) self.modalities = list() if self.audio_processor is not None: self.audio_placeholder = f'{self.audio_processor.audio_token}' self.discrete_audio_placeholder = f'{self.audio_processor.discrete_audio_token}' # self.audio_placeholder = f'{self.audio_processor.audio_start_token}{self.audio_processor.audio_token}{self.audio_processor.audio_end_token}' # self.discrete_audio_placeholder = f'{self.audio_processor.discrete_audio_start_token}{self.audio_processor.discrete_audio_token}{self.audio_processor.discrete_audio_end_token}' self.audio_token = self.audio_processor.audio_token self.audio_token_id = ( tokenizer.audio_token_id if getattr(tokenizer, "audio_token_id", None) else tokenizer.convert_tokens_to_ids(self.audio_processor.audio_token) ) # self.modalities.append("audio") # skip to check mm_validation if self.image_processor is not None: self.image_placeholder = f'{self.image_processor.image_token}' self.discrete_image_placeholder = f'{self.image_processor.discrete_image_token}' # self.image_placeholder = f'{self.image_processor.image_start_token}{self.image_processor.image_token}{self.image_processor.image_end_token}' # self.discrete_image_placeholder = f'{self.image_processor.discrete_image_start_token}{self.image_processor.discrete_image_token}{self.image_processor.discrete_image_end_token}' self.image_token = self.image_processor.image_token self.image_token_id = ( tokenizer.image_token_id if getattr(tokenizer, "image_token_id", None) else tokenizer.convert_tokens_to_ids(self.image_processor.image_token) ) self.discrete_image_ratio_tokens = { tuple(_discrete_image_ratio): f'<|vision_ratio_{_discrete_image_ratio[0]}:{_discrete_image_ratio[1]}|>' for _discrete_image_ratio in self.image_processor.discrete_image_ratios } self.modalities.append("image") if self.video_processor is not None: self.video_placeholder = f'{self.video_processor.video_token}' # self.video_placeholder = f'{self.video_processor.video_start_token}{self.video_processor.video_token}{self.video_processor.video_end_token}' self.video_token = self.video_processor.video_token self.video_token_id = ( tokenizer.video_token_id if getattr(tokenizer, "video_token_id", None) else tokenizer.convert_tokens_to_ids(self.video_processor.video_token) ) self.modalities.append("video") @classmethod def from_pretrained( cls: type[SpecificProcessorType], pretrained_model_name_or_path: Union[str, os.PathLike], **kwargs, ): audio_processer_kwargs = kwargs.pop("audio_processor_kwargs", dict()) iamge_processer_kwargs = kwargs.pop("image_processor_kwargs", dict()) video_processer_kwargs = kwargs.pop("video_processor_kwargs", dict()) if "tokenizer" not in kwargs: kwargs["tokenizer"] = AutoTokenizer.from_pretrained( pretrained_model_name_or_path, **kwargs, ) audio_processor = None try: audio_processor = AutoFeatureExtractor.from_pretrained( pretrained_model_name_or_path, subfolder="audio", **audio_processer_kwargs, **kwargs, ) except Exception as ex: pass image_processor = None try: image_processor = AutoImageProcessor.from_pretrained( pretrained_model_name_or_path, subfolder="image", **iamge_processer_kwargs, **kwargs, ) except Exception as ex: pass video_processor = None try: video_processor = AutoVideoProcessor.from_pretrained( pretrained_model_name_or_path, subfolder="video", **video_processer_kwargs, **kwargs, ) except Exception as ex: pass return super().from_pretrained( pretrained_model_name_or_path=pretrained_model_name_or_path, audio_processor=audio_processor, image_processor=image_processor, video_processor=video_processor, **kwargs, ) def save_pretrained( self, save_directory: Union[str, os.PathLike], *args, **kwargs, ): original_attributes = list(self.__class__.attributes) try: audio_processor = getattr(self, "audio_processor", None) if audio_processor is None and "audio_processor" in self.__class__.attributes: self.__class__.attributes = [a for a in self.__class__.attributes if a != "audio_processor"] # IMPORTANT: keep chat_template aligned with the (possibly custom) tokenizer's template. # If we don't do this before `super().save_pretrained`, the base processor may save # its own default Qwen template into `chat_template.jinja`, causing inconsistency. try: tok = getattr(self, "tokenizer", None) ct = getattr(tok, "chat_template", None) if tok is not None else None if isinstance(ct, str) and ct: self.chat_template = ct except Exception: pass self.register_for_auto_class() super().save_pretrained(save_directory, *args, **kwargs) finally: self.__class__.attributes = original_attributes # Persist chat_template into tokenizer_config.json so that loading the tokenizer alone # (AutoTokenizer.from_pretrained) keeps the same template behavior. try: chat_template = getattr(self, "chat_template", None) tokenizer_cfg_path = os.path.join(save_directory, "tokenizer_config.json") if isinstance(chat_template, str) and chat_template and os.path.exists(tokenizer_cfg_path): with open(tokenizer_cfg_path, "r", encoding="utf-8") as f: tokenizer_cfg = json.load(f) tokenizer_cfg["chat_template"] = chat_template # Also persist HCX's extra special token name->token mapping (used by vLLM integration/tests). # Some tokenizers expose this as `tokenizer.extra_special_tokens` (a dict) but do not save it by default. extra_map = getattr(getattr(self, "tokenizer", None), "extra_special_tokens", None) if not isinstance(extra_map, dict): extra_map = {} # Ensure at least the canonical multimodal tokens are present. extra_map.setdefault("image_token", "<|IMAGE_PAD|>") extra_map.setdefault("video_token", "<|VIDEO_PAD|>") tokenizer_cfg["extra_special_tokens"] = extra_map with open(tokenizer_cfg_path, "w", encoding="utf-8") as f: json.dump(tokenizer_cfg, f, ensure_ascii=False, indent=2) except Exception: # Best-effort: failing to write the chat template shouldn't break saving. pass audio_config_path = os.path.join(save_directory, "audio_preprocessor_config.json") if getattr(self, "audio_processor", None) is not None: with open(audio_config_path, "w", encoding="utf-8") as f: json.dump(self.audio_processor.to_dict(), f, ensure_ascii=False, indent=2) elif os.path.exists(audio_config_path): os.remove(audio_config_path) def __call__( self, text: Union[TextInput, PreTokenizedInput, list[TextInput], list[PreTokenizedInput]] = None, audios: AudioInput | None = None, images: ImageInput | None = None, videos: VideoInput | None = None, **kwargs: Unpack[HyperCLOVAXOmniProcessorKwargs], ) -> BatchFeature: """ Main method to prepare for the model one or several sequences(s) and image(s). This method forwards the `text` and `kwargs` arguments to Qwen2TokenizerFast's [`~Qwen2TokenizerFast.__call__`] if `text` is not `None` to encode the text. To prepare the vision inputs, this method forwards the `vision_infos` and `kwrags` arguments to Qwen2VLImageProcessor's [`~Qwen2VLImageProcessor.__call__`] if `vision_infos` is not `None`. Args: images (`PIL.Image.Image`, `np.ndarray`, `torch.Tensor`, `list[PIL.Image.Image]`, `list[np.ndarray]`, `list[torch.Tensor]`): The image or batch of images to be prepared. Each image can be a PIL image, NumPy array or PyTorch tensor. Both channels-first and channels-last formats are supported. text (`str`, `list[str]`, `list[list[str]]`): The sequence or batch of sequences to be encoded. Each sequence can be a string or a list of strings (pretokenized string). If the sequences are provided as list of strings (pretokenized), you must set `is_split_into_words=True` (to lift the ambiguity with a batch of sequences). videos (`np.ndarray`, `torch.Tensor`, `list[np.ndarray]`, `list[torch.Tensor]`): The image or batch of videos to be prepared. Each video can be a 4D NumPy array or PyTorch tensor, or a nested list of 3D frames. Both channels-first and channels-last formats are supported. return_tensors (`str` or [`~utils.TensorType`], *optional*): If set, will return tensors of a particular framework. Acceptable values are: - `'tf'`: Return TensorFlow `tf.constant` objects. - `'pt'`: Return PyTorch `torch.Tensor` objects. - `'np'`: Return NumPy `np.ndarray` objects. - `'jax'`: Return JAX `jnp.ndarray` objects. Returns: [`BatchFeature`]: A [`BatchFeature`] with the following fields: - **input_ids** -- List of token ids to be fed to a model. Returned when `text` is not `None`. - **attention_mask** -- List of indices specifying which tokens should be attended to by the model (when `return_attention_mask=True` or if *"attention_mask"* is in `self.model_input_names` and if `text` is not `None`). - **pixel_values** -- Pixel values to be fed to a model. Returned when `images` is not `None`. - **pixel_values_videos** -- Pixel values of videos to be fed to a model. Returned when `videos` is not `None`. - **image_grid_thw** -- List of image 3D grid in LLM. Returned when `images` is not `None`. - **video_grid_thw** -- List of video 3D grid in LLM. Returned when `videos` is not `None`. """ output_kwargs = self._merge_kwargs( HyperCLOVAXOmniProcessorKwargs, tokenizer_init_kwargs=self.tokenizer.init_kwargs, **kwargs, ) # [Text Processing] (Placeholder Replacement) if text is None: pass else: if isinstance(text, str): text = [text, ] # below lines change text in-place text = copy.deepcopy(text) # [Audio Processing] audio_inputs = dict() discrete_audio_inputs = dict() if ( audios is not None and self.audio_processor is not None ): if ( len(audios) > 0 and isinstance(audios[0], np.ndarray) ): # sample to batch if a single item is given audios = [audios, ] # continuous audio_inputs = self._process_continuous_audio( audios=audios, ) # discrete discrete_audio_inputs = self._process_discrete_audio( audios=audios, **output_kwargs["audio_kwargs"], ) # [Image Processing] image_inputs, image_grid_thw = dict(), list() discrete_image_inputs, discrete_image_ratios = dict(), list() if ( images is not None and self.image_processor is not None ): if ( len(images) > 0 and isinstance(images[0], PIL.Image.Image) ): # sample to batch if a single item is given images = [images, ] # continuous image_inputs = dict() for _images in images: _image_inputs = self.image_processor( images=_images, **output_kwargs["images_kwargs"], ) _image_grid_thw = _image_inputs["image_grid_thw"] for _k, _v in _image_inputs.items(): if _k not in image_inputs: image_inputs[_k] = list() image_inputs[_k].append(_v) image_grid_thw.append(_image_grid_thw) for _k, _v in image_inputs.items(): if isinstance(_v[0], torch.Tensor): image_inputs[_k] = torch.stack(_v, dim=0) # discrete discrete_image_inputs = self._process_discrete_images( images=images, ) discrete_image_ratios = discrete_image_inputs["discrete_image_ratios"] # [Video Processing] video_inputs, video_grid_thw = dict(), list() if ( videos is not None and self.video_processor is not None ): if ( len(videos) > 0 and isinstance(videos[0], np.ndarray) ): # sample to batch if a single item is given videos = [videos, ] # Video feature extraction video_inputs = dict() video_grid_thw = list() for _videos in videos: _video_inputs = self.video_processor( videos=_videos, **output_kwargs["videos_kwargs"], ) _video_grid_thw = _video_inputs["video_grid_thw"] for _k, _v in _video_inputs.items(): if _k not in video_inputs: video_inputs[_k] = list() video_inputs[_k].append(_v) video_grid_thw.append(_video_grid_thw) video_inputs = { _k: torch.stack(_v, dim=0) if isinstance(_v[0], torch.Tensor) else _v for _k, _v in video_inputs.items() } # [Expansion] - Audio if ( text is not None and audio_inputs ): for _sample_idx, (_text_before, _audio_query_lengths, _discrete_audio_query_lengths) in enumerate(zip( text, audio_inputs["audio_query_lengths"], discrete_audio_inputs["discrete_audio_query_lengths"], )): _find_iters = list(re.finditer(re.escape(self.audio_placeholder), _text_before, re.DOTALL)) if len(_find_iters) > 0: _text_after = "" _prev_end_idx = 0 for _idx, _continuous_audio_match in enumerate(_find_iters): _cur_start_idx = _continuous_audio_match.start() _inplace_str = self.get_audio_token_replacement( audio_query_length=_audio_query_lengths[_idx], include_boundary_tokens=True, tokenize=False, ) _discrete_audio_match = re.search(re.escape(self.discrete_audio_placeholder), _text_before[_prev_end_idx:_continuous_audio_match.start()]) if _discrete_audio_match: _cur_start_idx = _discrete_audio_match.start() _discrete_inplace_str = self.get_discrete_audio_token_replacement( discrete_audio_query_length=_discrete_audio_query_lengths[_idx], include_boundary_tokens=True, tokenize=False, ) _inplace_str = f'{_discrete_inplace_str}{_inplace_str}' _text_after += _text_before[_prev_end_idx:_cur_start_idx] _text_after += _inplace_str _prev_end_idx = _continuous_audio_match.end() _text_after += _text_before[_prev_end_idx:] text[_sample_idx] = _text_after # [Expansion] - Image if ( text is not None and image_inputs ): for _sample_idx, (_text_before, _image_grid_thw, _discrete_image_ratios) in enumerate(zip( text, image_inputs["image_grid_thw"], discrete_image_inputs["discrete_image_ratios"], )): _find_iters = list(re.finditer(re.escape(self.image_placeholder), _text_before, re.DOTALL)) if len(_find_iters) > 0: _text_after = "" _prev_end_idx = 0 for _idx, _continuous_image_match in enumerate(_find_iters): _cur_start_idx = _continuous_image_match.start() _inplace_str = self.get_image_token_replacement( image_grid_thw=_image_grid_thw[_idx], include_boundary_tokens=True, tokenize=False, ) _discrete_image_match = re.search(re.escape(self.discrete_image_placeholder), _text_before[_prev_end_idx:_continuous_image_match.start()]) if _discrete_image_match: _cur_start_idx = _discrete_image_match.start() _discrete_inplace_str = self.get_discrete_image_token_replacement( discrete_image_ratio=_discrete_image_ratios[_idx], include_boundary_tokens=True, tokenize=False, ) _inplace_str = f'{_discrete_inplace_str}{_inplace_str}' _text_after += _text_before[_prev_end_idx:_cur_start_idx] _text_after += _inplace_str _prev_end_idx = _continuous_image_match.end() _text_after += _text_before[_prev_end_idx:] text[_sample_idx] = _text_after # [Expansion] - Video if ( text is not None and video_inputs ): for _sample_idx, (_text_before, _video_grid_thw) in enumerate(zip( text, video_inputs["video_grid_thw"] )): _find_iters = list(re.finditer(re.escape(self.video_placeholder), _text_before, re.DOTALL)) if len(_find_iters) > 0: _text_after = "" _prev_end_idx = 0 for _idx, _continuous_video_match in enumerate(_find_iters): _cur_start_idx = _continuous_video_match.start() _inplace_str = self.get_video_token_replacement( video_grid_thw=_video_grid_thw[_idx], include_boundary_tokens=True, tokenize=False, ) _text_after += _text_before[_prev_end_idx:_cur_start_idx] _text_after += _inplace_str _prev_end_idx = _continuous_video_match.end() _text_after += _text_before[_prev_end_idx:] text[_sample_idx] = _text_after return_tensors = output_kwargs["text_kwargs"].pop("return_tensors", None) return_mm_token_type_ids = output_kwargs["text_kwargs"].pop("return_mm_token_type_ids", False) text_inputs = dict() if text is not None: text_inputs = self.tokenizer(text, **output_kwargs["text_kwargs"], return_tensors=None) self._check_special_mm_tokens( text, text_inputs, modalities=self.modalities, ) if return_mm_token_type_ids: array_ids = np.array(text_inputs["input_ids"]) mm_token_type_ids = np.zeros_like(text_inputs["input_ids"]) mm_token_type_ids[array_ids == self.image_processor.image_token_id] = 1 if text_inputs: text_inputs["mm_token_type_ids"] = mm_token_type_ids.tolist() data = { **text_inputs, **image_inputs, **video_inputs, **discrete_image_inputs, **audio_inputs, **discrete_audio_inputs, } _tensorable_data, _untensorable_data = dict(), dict() for _k, _v in data.items(): if _k in [ "discrete_image_ratios", ]: _untensorable_data[_k] = _v else: _tensorable_data[_k] = _v model_inputs = BatchFeature(data=_tensorable_data, tensor_type=return_tensors) model_inputs.update(_untensorable_data) return model_inputs def _process_continuous_audio( self, audios: Union[List[np.ndarray], List[List[np.ndarray]]], sample_rate: int = 16_000, chunk_unit: int = 80, min_chunk_size: int = 1_600, return_tensors: Optional[bool] = None, ): """Continuous Audio Preprocessing""" if ( len(audios) > 0 and isinstance(audios[0], np.ndarray) ): audios = [audios, ] audio_values, audio_masks, audio_query_lengths = list(), list(), list() for _audios in audios: _audio_values, _audio_masks, _audio_query_lengths = list(), list(), list() if len(_audios) == 0: _audio_values = torch.zeros(0, 128, 3000) _audio_masks = torch.zeros(0, 3000) _audio_query_lengths = [0, ] else: for _audio in _audios: chunks = [] for i in range(0, len(_audio), 30 * self.audio_processor.sampling_rate): chunks.append(_audio[i : i + 30 * self.audio_processor.sampling_rate]) num_of_chunks = len(chunks) preprocess_results = self.audio_processor( chunks, sampling_rate=self.audio_processor.sampling_rate, return_attention_mask=True, padding="max_length" ) _audio_value = preprocess_results.input_features _audio_mask = preprocess_results.attention_mask if isinstance(_audio_value, list): _audio_value = np.array(_audio_value) if isinstance(_audio_mask, list): _audio_mask = np.array(_audio_mask) input_lengths = int(_audio_mask.sum()) input_lengths = (input_lengths - 1) // 2 + 1 output_lengths = (input_lengths - 2) // 2 + 1 _audio_values.append(torch.Tensor(_audio_value)) _audio_masks.append(torch.Tensor(_audio_mask)) _audio_query_lengths.append(output_lengths) _audio_values = torch.cat(_audio_values, dim=0) _audio_masks = torch.cat(_audio_masks, dim=0) _audio_query_lengths = torch.tensor(_audio_query_lengths) audio_values.append(_audio_values) audio_masks.append(_audio_masks) audio_query_lengths.append(_audio_query_lengths) audio_values = torch.stack(audio_values, dim=0) audio_masks = torch.stack(audio_masks, dim=0) audio_query_lengths = torch.stack(audio_query_lengths, dim=0) return { "audio_values": audio_values, "audio_masks": audio_masks, "audio_query_lengths": audio_query_lengths, } def _process_discrete_audio( self, audios: Union[List[np.ndarray], List[List[np.ndarray]]], sample_rate: int = 16_000, chunk_unit: int = 80, min_chunk_size: int = 1_600, return_tensors: Optional[bool] = None, ): """Discrete Audio Preprocessing""" if ( len(audios) > 0 and isinstance(audios[0], np.ndarray) ): audios = [audios, ] discrete_audio_values, discrete_audio_query_lengths = list(), list() for _audios in audios: _discrete_audio_values, _discrete_audio_query_lengths = list(), list() for _audio in _audios: audio_length = len(_audio) max_audio_length = 600 * sample_rate audio_duration_sec = audio_length / sample_rate if audio_length < min_chunk_size: raise ValueError(f"Discrete audio too short: {audio_length}") if np.isnan(_audio).any() or np.isinf(_audio).any(): raise ValueError("Discrete audio contains NaN/Inf") if audio_length > max_audio_length: raise ValueError(f"Discrete audio too long: {audio_length} samples = ({audio_duration_sec:.2f}s > 600s)") audio_min, audio_max = _audio.min().item(), _audio.max().item() if audio_min < -100.0 or audio_max > 100.0: raise ValueError(f"Discrete audio values out of range: min {audio_min}, max {audio_max}") _audio_query_length = None if audio_length > chunk_unit * sample_rate: total_code_len = 0 chunk_size = chunk_unit * sample_rate for start in range(0, audio_length, chunk_size): end = min(start + chunk_size, audio_length) if end < audio_length and audio_length - end < min_chunk_size: end = audio_length chunk_len = end - start mel_len = chunk_len // 160 after_conv1 = (mel_len + 2 * 1 - 1 * (3 - 1) - 1) // 2 + 1 code_len = (after_conv1 + 2 * 1 - 1 * (3 - 1) - 1) // 2 + 1 total_code_len += code_len if end >= audio_length: break _audio_query_length = total_code_len else: mel_len = audio_length // 160 after_conv1 = (mel_len + 2 * 1 - 1 * (3 - 1) - 1) // 2 + 1 code_len = (after_conv1 + 2 * 1 - 1 * (3 - 1) - 1) // 2 + 1 _audio_query_length = code_len _discrete_audio_values.append(torch.tensor(_audio)) _discrete_audio_query_lengths.append(_audio_query_length) _discrete_audio_values = _discrete_audio_values = torch.stack(_discrete_audio_values, dim=0) _discrete_audio_query_lengths = torch.tensor(_discrete_audio_query_lengths) discrete_audio_values.append(_discrete_audio_values) discrete_audio_query_lengths.append(_discrete_audio_query_lengths) discrete_audio_values = torch.stack(discrete_audio_values, dim=0) discrete_audio_query_lengths = torch.stack(discrete_audio_query_lengths, dim=0) return { "discrete_audio_values": discrete_audio_values, "discrete_audio_query_lengths": discrete_audio_query_lengths, } def _process_discrete_images( self, images: Union[List[PIL.Image.Image], List[List[PIL.Image.Image]]], return_tensors: Optional[bool] = None, ): """Discrete Image Preprocessing""" if ( len(images) > 0 and isinstance(images[0], PIL.Image.Image) ): images = [images, ] discrete_pixel_values, image_ratios = list(), list() for _images in images: _discrete_pixel_values, _image_ratios = list(), list() for _image in _images: w, h = _image.size _img_ratio = self._find_best_ratio_token([h, w]) _discrete_pixel_value = _image.resize((384, 384), Image.BICUBIC) _discrete_pixel_tensor = to_tensor(_discrete_pixel_value) _discrete_pixel_tensor = _discrete_pixel_tensor.squeeze(dim=0) _discrete_pixel_values.append(_discrete_pixel_tensor) _img_ratio = torch.tensor(_img_ratio) _image_ratios.append(_img_ratio) _discrete_pixel_values = torch.stack(_discrete_pixel_values, dim=0) _image_ratios = torch.stack(_image_ratios, dim=0) discrete_pixel_values.append(_discrete_pixel_values) image_ratios.append(_image_ratios) discrete_pixel_values = torch.stack(discrete_pixel_values, dim=0) image_ratios = torch.stack(image_ratios, dim=0) return { "discrete_pixel_values": discrete_pixel_values, "discrete_image_ratios": image_ratios, } def _find_best_ratio_token( self, original_size: List[int], ): """Find the best ratio token based on original_size""" base_ratios = list(self.discrete_image_ratio_tokens.keys()) vision_aspect_ratios = [r for ratio in base_ratios for r in [ratio, ratio[::-1]]][1:] # 13 ratios total if not isinstance(original_size, list) or len(original_size) != 2: return self.discrete_image_ratio_tokens[(1, 1)] h, w = original_size if h == 0 or w == 0: return self.discrete_image_ratio_tokens[(1, 1)] ratios = [i / j for i, j in vision_aspect_ratios] best_size_idx = np.argmin([abs(w / h - r) for r in ratios]) i, j = vision_aspect_ratios[best_size_idx] return (i, j) def get_num_audio_tokens( self, audio_masks: torch.Tensor, **kwargs: Unpack[HyperCLOVAXOmniProcessorKwargs], ) -> int: kwargs = self._merge_kwargs( HyperCLOVAXOmniProcessorKwargs, tokenizer_init_kwargs=self.tokenizer.init_kwargs, **kwargs, ) def _compute_num_audio_tokens(audio_mask: torch.Tensor,): """ audio_mask: shape (N, ) """ input_length = (int(audio_mask.sum()) - 1) // 2 + 1 num_audio_tokens = (input_length - 2) // 2 + 1 return num_audio_tokens if len(audio_masks.shape) == 1: num_audio_tokens = _compute_num_audio_tokens(audio_mask=audio_masks) else: # len(audio_masks.shape) == 2 num_audio_tokens = sum([ _compute_num_audio_tokens(audio_mask=_audio_mask) for _audio_mask in audio_masks ]) # num_audio_tokens += 2 # <|audio_start|>, <|audio_end|> return num_audio_tokens def get_num_discrete_audio_tokens( self, discrete_audio_values: Optional[torch.Tensor] = None, **kwargs: Unpack[HyperCLOVAXOmniProcessorKwargs], ) -> int: kwargs = self._merge_kwargs( HyperCLOVAXOmniProcessorKwargs, tokenizer_init_kwargs=self.tokenizer.init_kwargs, **kwargs, ) audio_length = len(discrete_audio_values) num_audio_tokens = 0 chunk_size = kwargs["audio_kwargs"].get("chunk_unit", 80) * kwargs["audio_kwargs"].get("sample_rate", 16_000) for _start in range(0, audio_length, chunk_size): _end = min(_start + chunk_size, audio_length) _chunked_length = _end - _start _num_mel_frames = _chunked_length // 160 _num_mel_frames_conv1 = (_num_mel_frames + 2 * 1 - 1 * (3 - 1) - 1) // 2 + 1 _num_audio_tokens = (_num_mel_frames_conv1 + 2 * 1 - 1 * (3 - 1) - 1) // 2 + 1 num_audio_tokens += _num_audio_tokens # num_audio_tokens += 2 # <|discrete_audio_start|>, <|discrete_audio_end|> return num_audio_tokens def get_num_image_tokens( self, image_width: Optional[int] = None, image_height: Optional[int] = None, pixel_values: Optional[torch.Tensor] = None, **kwargs: Unpack[HyperCLOVAXOmniProcessorKwargs], ) -> int: kwargs = self._merge_kwargs( HyperCLOVAXOmniProcessorKwargs, tokenizer_init_kwargs=self.tokenizer.init_kwargs, **kwargs, ) image_processor_merge_size = 2 if self.image_processor is not None: image_processor_merge_size = getattr(self.image_processor, "merge_size", 2) num_image_tokens = None if pixel_values is None: images_kwargs = Qwen2_5_VLProcessorKwargs._defaults.get("images_kwargs", {}) images_kwargs.update(kwargs["images_kwargs"]) num_image_patches = self.image_processor.get_number_of_image_patches( image_height, image_width, images_kwargs, ) num_image_tokens = num_image_patches // (image_processor_merge_size ** 2) elif len(pixel_values.shape) == 2: num_image_tokens = pixel_values.shape[0] // (image_processor_merge_size ** 2) else: # len(pixel_values_videos.shape) == 3 num_image_tokens = sum([ _pixel_values.shape[0] // (image_processor_merge_size ** 2) for _pixel_values in pixel_values ]) # num_image_tokens += 2 # <|image_start|>, <|image_end|> return num_image_tokens def get_num_discrete_image_tokens( self, **kwargs: Unpack[HyperCLOVAXOmniProcessorKwargs], ) -> int: kwargs = self._merge_kwargs( HyperCLOVAXOmniProcessorKwargs, tokenizer_init_kwargs=self.tokenizer.init_kwargs, **kwargs, ) discrete_token_size = self.image_processor.discrete_token_size num_image_tokens = discrete_token_size ** 2 + discrete_token_size # <|vision_eol|> # num_image_tokens += 3 # <|discrete_image_start|>, <|vision_eof|>, <|discrete_image_end|> return num_image_tokens def get_num_video_tokens( self, image_width: Optional[int] = None, image_height: Optional[int] = None, num_frames: Optional[int] = None, pixel_values_videos: Optional[torch.Tensor] = None, **kwargs: Unpack[HyperCLOVAXOmniProcessorKwargs], ) -> int: kwargs = self._merge_kwargs( HyperCLOVAXOmniProcessorKwargs, tokenizer_init_kwargs=self.tokenizer.init_kwargs, **kwargs, ) video_processor_merge_size = 2 if self.video_processor is not None: video_processor_merge_size = getattr(self.video_processor, "merge_size", 2) if not pixel_values_videos: videos_kwargs = Qwen2_5_VLProcessorKwargs._defaults.get("videos_kwargs", {}) videos_kwargs.update(kwargs["videos_kwargs"]) num_video_patches = self.video_processor.get_num_of_video_patches( num_frames, image_height, image_width, videos_kwargs, ) num_video_tokens = num_video_patches // (video_processor_merge_size ** 2) elif len(pixel_values_videos.shape) == 2: num_video_tokens = pixel_values_videos.shape[0] // (video_processor_merge_size ** 2) else: # len(pixel_values_videos.shape) == 3 num_video_tokens = sum([ _pixel_values_videos.shape[0] // (video_processor_merge_size ** 2) for _pixel_values_videos in pixel_values_videos ]) # num_video_tokens += 2 # <|video_start|>, <|video_end|> return num_video_tokens def get_audio_token_replacement( self, audio_query_length: int, include_boundary_tokens: Optional[bool] = True, tokenize: Optional[bool] = False, ): replacement = self.audio_processor.audio_token * int(audio_query_length) if include_boundary_tokens: replacement = f'{self.audio_processor.audio_start_token}{replacement}{self.audio_processor.audio_end_token}' if tokenize: replacement = self.tokenizer.encode(replacement) return replacement def get_discrete_audio_token_replacement( self, discrete_audio_query_length: Optional[int] = None, include_boundary_tokens: Optional[bool] = True, tokenize: Optional[bool] = False, ): replacement = self.audio_processor.discrete_audio_token * int(discrete_audio_query_length) if include_boundary_tokens: replacement = f'{self.audio_processor.discrete_audio_start_token}{replacement}{self.audio_processor.discrete_audio_end_token}' if tokenize: replacement = self.tokenizer.encode(replacement) return replacement def get_image_token_replacement( self, image_grid_thw: List[int], include_boundary_tokens: Optional[bool] = True, tokenize: Optional[bool] = False, ): merge_length = self.image_processor.merge_size ** 2 discrete_token_size = self.image_processor.discrete_token_size _num_image_tokens = image_grid_thw.prod() // merge_length replacement = self.image_processor.image_token * int(_num_image_tokens) if include_boundary_tokens: replacement = f'{self.image_processor.image_start_token}{replacement}{self.image_processor.image_end_token}' if tokenize: replacement = self.tokenizer.encode(replacement) return replacement def get_discrete_image_token_replacement( self, discrete_image_ratio: Optional[List[int]] = None, include_boundary_tokens: Optional[bool] = True, tokenize: Optional[bool] = False, ): discrete_token_size = self.image_processor.discrete_token_size _row_str = f'{(self.image_processor.discrete_image_token * discrete_token_size)}{self.image_processor.vision_eol_token}' _discrete_image_ratio_token = self.discrete_image_ratio_tokens[(discrete_image_ratio[0], discrete_image_ratio[0])] replacement = f'{_discrete_image_ratio_token}{(_row_str * discrete_token_size)}' if include_boundary_tokens: replacement = f'{self.image_processor.discrete_image_start_token}{replacement}{self.image_processor.discrete_image_end_token}' if tokenize: replacement = self.tokenizer.encode(replacement) return replacement def get_video_token_replacement( self, video_grid_thw: List[int], include_boundary_tokens: Optional[bool] = True, tokenize: Optional[bool] = False, ): merge_length = self.video_processor.merge_size ** 2 _num_video_tokens = video_grid_thw.prod() // merge_length replacement = self.video_processor.video_token * int(_num_video_tokens) if include_boundary_tokens: replacement = f'{self.video_processor.video_start_token}{replacement}{self.video_processor.video_end_token}' if tokenize: replacement = self.tokenizer.encode(replacement) return replacement