| | """Image processor class for Kimi-K2.5. |
| | """ |
| |
|
| | import json |
| | from typing import Any, Dict, Optional, Union |
| |
|
| | import numpy as np |
| | import torch |
| | from PIL import Image |
| | from transformers.image_processing_utils import (BaseImageProcessor, |
| | BatchFeature) |
| | from transformers.utils import TensorType |
| |
|
| | from .media_utils import (MediaInput, VideoChunkInput, _to_tensor, |
| | ensure_media_type, get_video_meta, image_to_np, |
| | navit_patchify, navit_resize_image, |
| | navit_resize_video, normalize, |
| | real_sample_fps_and_max_num_frames, timestamp_as_str) |
| |
|
| | try: |
| | from mecord import VideoReader |
| | except ImportError: |
| | VideoReader = None |
| |
|
| |
|
| | def resampling(video_bytes: bytes, |
| | sample_indices: list[int], |
| | key_indices=None, |
| | frame_time_info=None, |
| | num_threads=4) -> str: |
| | video = VideoReader(video_bytes, |
| | num_threads=num_threads, |
| | frame_time_info=frame_time_info, |
| | key_indices=key_indices) |
| | |
| | frames = video[sample_indices] |
| | frames = [Image.fromarray(frame) for frame in frames] |
| | return frames |
| |
|
| |
|
| | class KimiK25VisionProcessor(BaseImageProcessor): |
| | model_type = "kimi_k25" |
| |
|
| | def __init__( |
| | self, |
| | media_proc_cfg: dict, |
| | **kwargs, |
| | ): |
| | super().__init__(**kwargs) |
| | self.media_proc_cfg = media_proc_cfg |
| | self.num_frames_per_chunk = media_proc_cfg[ |
| | 'temporal_merge_kernel_size'] |
| |
|
| | def media_tokens_calculator(self, media: MediaInput): |
| | media = ensure_media_type(media) |
| | ret = self.get_resize_config(media) |
| | return ret['num_tokens'] |
| |
|
| | @classmethod |
| | def make_chunk_prompt(cls, timestamp_text: str) -> str: |
| | return f"{timestamp_text}<|media_begin|>video<|media_content|><|media_pad|><|media_end|>" |
| |
|
| | def split_video_chunks(self, |
| | video_url: str | bytes) -> list[list[Image.Image]]: |
| | |
| | video_spec = get_video_meta(video_url) |
| | sample_fps = min(self.media_proc_cfg['sample_fps'], video_spec.fps) |
| | sampled_nframes = max( |
| | round(video_spec.num_frames * sample_fps / video_spec.fps), 1) |
| | frame_inds = np.linspace(0, video_spec.num_frames - 1, |
| | sampled_nframes).round().astype(int) |
| | frame_inds = frame_inds.tolist() |
| | sampled_frame_ids = [] |
| | temporal_merge_kernel_size = self.media_proc_cfg[ |
| | "temporal_merge_kernel_size"] |
| | num_chunks = 0 |
| | chunk_timestamp = [] |
| | for i in range(0, len(frame_inds), temporal_merge_kernel_size): |
| | sampled_frame_ids.extend(frame_inds[i:i + |
| | temporal_merge_kernel_size]) |
| | start_time = frame_inds[i] / float(video_spec.fps) |
| | timestamp_text = timestamp_as_str( |
| | start_time, self.media_proc_cfg["timestamp_mode"]) |
| | chunk_timestamp.append(timestamp_text) |
| | num_chunks += 1 |
| |
|
| | sampled_frames = resampling(video_url, sampled_frame_ids) |
| | chunks = [] |
| | for chunk_id in range(num_chunks): |
| | chunk = sampled_frames[chunk_id * |
| | temporal_merge_kernel_size:(chunk_id + 1) * |
| | temporal_merge_kernel_size] |
| | chunks.append( |
| | VideoChunkInput(type="video_chunk", |
| | video_chunk=chunk, |
| | prompt=self.make_chunk_prompt( |
| | chunk_timestamp[chunk_id]))) |
| | return chunks |
| |
|
| | def get_resize_config(self, media_input: MediaInput) -> dict: |
| | if media_input['type'] == 'image': |
| | w, h = media_input['image'].size |
| | ret = navit_resize_image( |
| | w, h, self.media_proc_cfg['patch_size'], |
| | self.media_proc_cfg['merge_kernel_size'], |
| | self.media_proc_cfg['in_patch_limit'], |
| | self.media_proc_cfg['patch_limit_on_one_side'], |
| | self.media_proc_cfg['fixed_output_tokens']) |
| | return ret |
| | elif media_input['type'] == 'video_chunk': |
| | frame = media_input['video_chunk'][0] |
| | width, height = frame.size |
| | num_frames = len(media_input["video_chunk"]) |
| | fps = 1.0 |
| |
|
| | sample_fps, max_num_frames_each_video = real_sample_fps_and_max_num_frames( |
| | media_input["type"], |
| | self.media_proc_cfg['sample_fps'], |
| | self.media_proc_cfg['max_num_frames_each_video'], |
| | ) |
| |
|
| | in_patch_limit_each_frame = self.media_proc_cfg[ |
| | 'in_patch_limit_each_frame'] |
| | if in_patch_limit_each_frame is None: |
| | in_patch_limit_each_frame = self.media_proc_cfg[ |
| | 'in_patch_limit'] |
| |
|
| | ret = navit_resize_video( |
| | width, |
| | height, |
| | num_frames, |
| | fps, |
| | sample_fps, |
| | self.media_proc_cfg['patch_size'], |
| | self.media_proc_cfg['merge_kernel_size'], |
| | in_patch_limit_each_frame, |
| | self.media_proc_cfg['patch_limit_on_one_side'], |
| | self.media_proc_cfg['in_patch_limit_video'], |
| | max_num_frames_each_video, |
| | self.media_proc_cfg['fixed_output_tokens'], |
| | ) |
| | return ret |
| | else: |
| | raise ValueError("Unsupported type: {}".format( |
| | media_input['type'])) |
| |
|
| | def resize_image(self, image: Image.Image, new_width: int, new_height: int, |
| | pad_width: int, pad_height: int) -> np.ndarray: |
| | image_np = image_to_np(image, (new_width, new_height), "resize") |
| | image_np = np.pad( |
| | image_np, |
| | ((0, pad_height), (0, pad_width), (0, 0)), |
| | mode="constant", |
| | constant_values=0, |
| | ) |
| | return image_np |
| |
|
| | def preprocess( |
| | self, |
| | medias: list[MediaInput], |
| | return_tensors: Optional[Union[str, TensorType]] = None, |
| | ) -> BatchFeature: |
| | """ |
| | Preprocess a atom vision input (images/video_chunk) into model-ready tensors. |
| | |
| | Args: |
| | medias: List of MediaInput. |
| | return_tensors: Desired output format ('pt', 'np', 'tf', or None). |
| | |
| | Returns: |
| | BatchFeature containing 'pixel_values' and 'grid_thws' tensors. |
| | """ |
| | if not isinstance(medias, list): |
| | medias = [medias] |
| | if medias: |
| | pixel_values = [] |
| | for item in medias: |
| | item = ensure_media_type(item) |
| | resize_config = self.get_resize_config(item) |
| | new_width, new_height, pad_width, pad_height = resize_config[ |
| | 'new_width'], resize_config['new_height'], resize_config[ |
| | 'pad_width'], resize_config['pad_height'] |
| | if item['type'] == 'image': |
| | image = item['image'] |
| | image_np = self.resize_image(image, new_width, new_height, |
| | pad_width, pad_height) |
| | pixel_values.append(np.expand_dims(image_np, axis=0)) |
| | elif item['type'] == 'video_chunk': |
| | pixels = [] |
| | for frame in item['video_chunk']: |
| | frame_np = self.resize_image(frame, new_width, |
| | new_height, pad_width, |
| | pad_height) |
| | pixels.append(frame_np) |
| | pixel_values.append(np.stack(pixels, axis=0)) |
| | else: |
| | raise ValueError("Unsupported type: {}".format( |
| | item['type'])) |
| | normalized_pixel_values = [] |
| | image_std_inv = 1.0 / np.array(self.media_proc_cfg['image_std']) |
| | image_mean = np.array(self.media_proc_cfg['image_mean']) |
| | for pixels in pixel_values: |
| | pixels = normalize(pixels, image_mean, image_std_inv) |
| | pixels_and_thw = navit_patchify( |
| | pixels, |
| | self.media_proc_cfg['patch_size'], |
| | ) |
| | normalized_pixel_values.append(pixels_and_thw) |
| |
|
| | pixel_values = torch.cat([ |
| | _to_tensor(pixel_value['pixel_values']) |
| | for pixel_value in normalized_pixel_values |
| | ]) |
| | grid_thws = torch.cat([ |
| | _to_tensor(pixel_value['grid_thw'], |
| | dtype=torch.int64).unsqueeze(0) |
| | for pixel_value in normalized_pixel_values |
| | ]) |
| |
|
| | data = { |
| | 'pixel_values': pixel_values, |
| | 'grid_thws': grid_thws, |
| | } |
| |
|
| | else: |
| | data = {} |
| |
|
| | return BatchFeature(data=data, tensor_type=return_tensors) |
| |
|
| | def __repr__(self): |
| | return f"KimiK25VisionProcessor(media_proc_cfg={self.media_proc_cfg})" |
| |
|
| | def to_dict(self) -> Dict[str, Any]: |
| | output = super().to_dict() |
| | output["media_proc_cfg"] = self.media_proc_cfg |
| | if "media_processor" in output: |
| | del output["media_processor"] |
| | return output |
| |
|
| | @classmethod |
| | def from_dict(cls, config_dict: Dict[str, Any], **kwargs): |
| | config = config_dict.copy() |
| | media_proc_cfg = config.pop("media_proc_cfg", {}) |
| | return cls(media_proc_cfg=media_proc_cfg, **config, **kwargs) |
| |
|
| | def to_json_string(self): |
| | dictionary = self.to_dict() |
| | for key, value in dictionary.items(): |
| | if hasattr(value, 'tolist'): |
| | dictionary[key] = value.tolist() |
| | return json.dumps(dictionary, indent=2, sort_keys=True) + "\n" |
| |
|