|
|
"""Image processor class for Kimi-K2.5. |
|
|
""" |
|
|
|
|
|
import json |
|
|
from typing import Any, Dict, Optional, Union |
|
|
|
|
|
import numpy as np |
|
|
import torch |
|
|
from PIL import Image |
|
|
from transformers.image_processing_utils import (BaseImageProcessor, |
|
|
BatchFeature) |
|
|
from transformers.utils import TensorType |
|
|
|
|
|
from .media_utils import (MediaInput, VideoChunkInput, _to_tensor, |
|
|
ensure_media_type, get_video_meta, image_to_np, |
|
|
navit_patchify, navit_resize_image, |
|
|
navit_resize_video, normalize, |
|
|
real_sample_fps_and_max_num_frames, timestamp_as_str) |
|
|
|
|
|
try: |
|
|
from mecord import VideoReader |
|
|
except ImportError: |
|
|
VideoReader = None |
|
|
|
|
|
|
|
|
def resampling(video_bytes: bytes, |
|
|
sample_indices: list[int], |
|
|
key_indices=None, |
|
|
frame_time_info=None, |
|
|
num_threads=4) -> str: |
|
|
video = VideoReader(video_bytes, |
|
|
num_threads=num_threads, |
|
|
frame_time_info=frame_time_info, |
|
|
key_indices=key_indices) |
|
|
|
|
|
frames = video[sample_indices] |
|
|
frames = [Image.fromarray(frame) for frame in frames] |
|
|
return frames |
|
|
|
|
|
|
|
|
class KimiK25VisionProcessor(BaseImageProcessor): |
|
|
model_type = "kimi_k25" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
media_proc_cfg: dict, |
|
|
**kwargs, |
|
|
): |
|
|
super().__init__(**kwargs) |
|
|
self.media_proc_cfg = media_proc_cfg |
|
|
self.num_frames_per_chunk = media_proc_cfg[ |
|
|
'temporal_merge_kernel_size'] |
|
|
|
|
|
def media_tokens_calculator(self, media: MediaInput): |
|
|
media = ensure_media_type(media) |
|
|
ret = self.get_resize_config(media) |
|
|
return ret['num_tokens'] |
|
|
|
|
|
@classmethod |
|
|
def make_chunk_prompt(cls, timestamp_text: str) -> str: |
|
|
return f"{timestamp_text}<|media_begin|>video<|media_content|><|media_pad|><|media_end|>" |
|
|
|
|
|
def split_video_chunks(self, |
|
|
video_url: str | bytes) -> list[list[Image.Image]]: |
|
|
|
|
|
video_spec = get_video_meta(video_url) |
|
|
sample_fps = min(self.media_proc_cfg['sample_fps'], video_spec.fps) |
|
|
sampled_nframes = max( |
|
|
round(video_spec.num_frames * sample_fps / video_spec.fps), 1) |
|
|
frame_inds = np.linspace(0, video_spec.num_frames - 1, |
|
|
sampled_nframes).round().astype(int) |
|
|
frame_inds = frame_inds.tolist() |
|
|
sampled_frame_ids = [] |
|
|
temporal_merge_kernel_size = self.media_proc_cfg[ |
|
|
"temporal_merge_kernel_size"] |
|
|
num_chunks = 0 |
|
|
chunk_timestamp = [] |
|
|
for i in range(0, len(frame_inds), temporal_merge_kernel_size): |
|
|
sampled_frame_ids.extend(frame_inds[i:i + |
|
|
temporal_merge_kernel_size]) |
|
|
start_time = frame_inds[i] / float(video_spec.fps) |
|
|
timestamp_text = timestamp_as_str( |
|
|
start_time, self.media_proc_cfg["timestamp_mode"]) |
|
|
chunk_timestamp.append(timestamp_text) |
|
|
num_chunks += 1 |
|
|
|
|
|
sampled_frames = resampling(video_url, sampled_frame_ids) |
|
|
chunks = [] |
|
|
for chunk_id in range(num_chunks): |
|
|
chunk = sampled_frames[chunk_id * |
|
|
temporal_merge_kernel_size:(chunk_id + 1) * |
|
|
temporal_merge_kernel_size] |
|
|
chunks.append( |
|
|
VideoChunkInput(type="video_chunk", |
|
|
video_chunk=chunk, |
|
|
prompt=self.make_chunk_prompt( |
|
|
chunk_timestamp[chunk_id]))) |
|
|
return chunks |
|
|
|
|
|
def get_resize_config(self, media_input: MediaInput) -> dict: |
|
|
if media_input['type'] == 'image': |
|
|
w, h = media_input['image'].size |
|
|
ret = navit_resize_image( |
|
|
w, h, self.media_proc_cfg['patch_size'], |
|
|
self.media_proc_cfg['merge_kernel_size'], |
|
|
self.media_proc_cfg['in_patch_limit'], |
|
|
self.media_proc_cfg['patch_limit_on_one_side'], |
|
|
self.media_proc_cfg['fixed_output_tokens']) |
|
|
return ret |
|
|
elif media_input['type'] == 'video_chunk': |
|
|
frame = media_input['video_chunk'][0] |
|
|
width, height = frame.size |
|
|
num_frames = len(media_input["video_chunk"]) |
|
|
fps = 1.0 |
|
|
|
|
|
sample_fps, max_num_frames_each_video = real_sample_fps_and_max_num_frames( |
|
|
media_input["type"], |
|
|
self.media_proc_cfg['sample_fps'], |
|
|
self.media_proc_cfg['max_num_frames_each_video'], |
|
|
) |
|
|
|
|
|
in_patch_limit_each_frame = self.media_proc_cfg[ |
|
|
'in_patch_limit_each_frame'] |
|
|
if in_patch_limit_each_frame is None: |
|
|
in_patch_limit_each_frame = self.media_proc_cfg[ |
|
|
'in_patch_limit'] |
|
|
|
|
|
ret = navit_resize_video( |
|
|
width, |
|
|
height, |
|
|
num_frames, |
|
|
fps, |
|
|
sample_fps, |
|
|
self.media_proc_cfg['patch_size'], |
|
|
self.media_proc_cfg['merge_kernel_size'], |
|
|
in_patch_limit_each_frame, |
|
|
self.media_proc_cfg['patch_limit_on_one_side'], |
|
|
self.media_proc_cfg['in_patch_limit_video'], |
|
|
max_num_frames_each_video, |
|
|
self.media_proc_cfg['fixed_output_tokens'], |
|
|
) |
|
|
return ret |
|
|
else: |
|
|
raise ValueError("Unsupported type: {}".format( |
|
|
media_input['type'])) |
|
|
|
|
|
def resize_image(self, image: Image.Image, new_width: int, new_height: int, |
|
|
pad_width: int, pad_height: int) -> np.ndarray: |
|
|
image_np = image_to_np(image, (new_width, new_height), "resize") |
|
|
image_np = np.pad( |
|
|
image_np, |
|
|
((0, pad_height), (0, pad_width), (0, 0)), |
|
|
mode="constant", |
|
|
constant_values=0, |
|
|
) |
|
|
return image_np |
|
|
|
|
|
def preprocess( |
|
|
self, |
|
|
medias: list[MediaInput], |
|
|
return_tensors: Optional[Union[str, TensorType]] = None, |
|
|
) -> BatchFeature: |
|
|
""" |
|
|
Preprocess a atom vision input (images/video_chunk) into model-ready tensors. |
|
|
|
|
|
Args: |
|
|
medias: List of MediaInput. |
|
|
return_tensors: Desired output format ('pt', 'np', 'tf', or None). |
|
|
|
|
|
Returns: |
|
|
BatchFeature containing 'pixel_values' and 'grid_thws' tensors. |
|
|
""" |
|
|
if not isinstance(medias, list): |
|
|
medias = [medias] |
|
|
if medias: |
|
|
pixel_values = [] |
|
|
for item in medias: |
|
|
item = ensure_media_type(item) |
|
|
resize_config = self.get_resize_config(item) |
|
|
new_width, new_height, pad_width, pad_height = resize_config[ |
|
|
'new_width'], resize_config['new_height'], resize_config[ |
|
|
'pad_width'], resize_config['pad_height'] |
|
|
if item['type'] == 'image': |
|
|
image = item['image'] |
|
|
image_np = self.resize_image(image, new_width, new_height, |
|
|
pad_width, pad_height) |
|
|
pixel_values.append(np.expand_dims(image_np, axis=0)) |
|
|
elif item['type'] == 'video_chunk': |
|
|
pixels = [] |
|
|
for frame in item['video_chunk']: |
|
|
frame_np = self.resize_image(frame, new_width, |
|
|
new_height, pad_width, |
|
|
pad_height) |
|
|
pixels.append(frame_np) |
|
|
pixel_values.append(np.stack(pixels, axis=0)) |
|
|
else: |
|
|
raise ValueError("Unsupported type: {}".format( |
|
|
item['type'])) |
|
|
normalized_pixel_values = [] |
|
|
image_std_inv = 1.0 / np.array(self.media_proc_cfg['image_std']) |
|
|
image_mean = np.array(self.media_proc_cfg['image_mean']) |
|
|
for pixels in pixel_values: |
|
|
pixels = normalize(pixels, image_mean, image_std_inv) |
|
|
pixels_and_thw = navit_patchify( |
|
|
pixels, |
|
|
self.media_proc_cfg['patch_size'], |
|
|
) |
|
|
normalized_pixel_values.append(pixels_and_thw) |
|
|
|
|
|
pixel_values = torch.cat([ |
|
|
_to_tensor(pixel_value['pixel_values']) |
|
|
for pixel_value in normalized_pixel_values |
|
|
]) |
|
|
grid_thws = torch.cat([ |
|
|
_to_tensor(pixel_value['grid_thw'], |
|
|
dtype=torch.int64).unsqueeze(0) |
|
|
for pixel_value in normalized_pixel_values |
|
|
]) |
|
|
|
|
|
data = { |
|
|
'pixel_values': pixel_values, |
|
|
'grid_thws': grid_thws, |
|
|
} |
|
|
|
|
|
else: |
|
|
data = {} |
|
|
|
|
|
return BatchFeature(data=data, tensor_type=return_tensors) |
|
|
|
|
|
def __repr__(self): |
|
|
return f"KimiK25VisionProcessor(media_proc_cfg={self.media_proc_cfg})" |
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]: |
|
|
output = super().to_dict() |
|
|
output["media_proc_cfg"] = self.media_proc_cfg |
|
|
if "media_processor" in output: |
|
|
del output["media_processor"] |
|
|
return output |
|
|
|
|
|
@classmethod |
|
|
def from_dict(cls, config_dict: Dict[str, Any], **kwargs): |
|
|
config = config_dict.copy() |
|
|
media_proc_cfg = config.pop("media_proc_cfg", {}) |
|
|
return cls(media_proc_cfg=media_proc_cfg, **config, **kwargs) |
|
|
|
|
|
def to_json_string(self): |
|
|
dictionary = self.to_dict() |
|
|
for key, value in dictionary.items(): |
|
|
if hasattr(value, 'tolist'): |
|
|
dictionary[key] = value.tolist() |
|
|
return json.dumps(dictionary, indent=2, sort_keys=True) + "\n" |
|
|
|