"""LLM-jp-VL Processor — combines SigLIP image processing + dynamic patching + tokenization.""" from typing import List, Optional, Union import torch from PIL import Image from transformers import BatchFeature, ProcessorMixin def find_closest_aspect_ratio(aspect_ratio, target_ratios, width, height, image_size): best_ratio_diff = float("inf") best_ratio = (1, 1) area = width * height for ratio in target_ratios: target_aspect_ratio = ratio[0] / ratio[1] ratio_diff = abs(aspect_ratio - target_aspect_ratio) if ratio_diff < best_ratio_diff: best_ratio_diff = ratio_diff best_ratio = ratio elif ratio_diff == best_ratio_diff: if area > 0.5 * image_size * image_size * ratio[0] * ratio[1]: best_ratio = ratio return best_ratio def dynamic_preprocess( image, min_num=1, max_num=12, image_size=512, use_thumbnail=False ): orig_width, orig_height = image.size aspect_ratio = orig_width / orig_height target_ratios = set( (i, j) for n in range(min_num, max_num + 1) for i in range(1, n + 1) for j in range(1, n + 1) if i * j <= max_num and i * j >= min_num ) target_ratios = sorted(target_ratios, key=lambda x: x[0] * x[1]) target_aspect_ratio = find_closest_aspect_ratio( aspect_ratio, target_ratios, orig_width, orig_height, image_size ) target_width = image_size * target_aspect_ratio[0] target_height = image_size * target_aspect_ratio[1] blocks = target_aspect_ratio[0] * target_aspect_ratio[1] resized_img = image.resize((target_width, target_height)) processed_images = [] for i in range(blocks): box = ( (i % (target_width // image_size)) * image_size, (i // (target_width // image_size)) * image_size, ((i % (target_width // image_size)) + 1) * image_size, ((i // (target_width // image_size)) + 1) * image_size, ) processed_images.append(resized_img.crop(box)) if use_thumbnail and len(processed_images) != 1: processed_images.append(image.resize((image_size, image_size))) return processed_images class LLMjpVLProcessor(ProcessorMixin): attributes = ["image_processor", "tokenizer"] image_processor_class = "AutoImageProcessor" tokenizer_class = "AutoTokenizer" def __init__( self, image_processor, tokenizer, image_seq_length=256, max_dynamic_patch=12, min_dynamic_patch=1, use_thumbnail=True, chat_template=None, **kwargs, ): self.image_seq_length = image_seq_length self.max_dynamic_patch = max_dynamic_patch self.min_dynamic_patch = min_dynamic_patch self.use_thumbnail = use_thumbnail if chat_template is not None: tokenizer.chat_template = chat_template super().__init__(image_processor, tokenizer, **kwargs) def __call__( self, images: Optional[Union[Image.Image, List[Image.Image]]] = None, text: Optional[Union[str, List[str]]] = None, return_tensors: Optional[str] = None, **kwargs, ) -> BatchFeature: if text is None and images is None: raise ValueError("You must provide at least one of `text` or `images`.") data = {} num_patches_list = [] if images is not None: if isinstance(images, Image.Image): images = [images] image_size = self.image_processor.size.get( "height", self.image_processor.size.get("shortest_edge", 512) ) all_pixel_values = [] num_image = len(images) # Compute max patches per image from actual text token count. # Each image uses (max_num + 1) * image_seq_length + 2 tokens (thumbnail added when max_num > 1). if text is not None: text_without_images = text if isinstance(text, str) else text[0] text_without_images = text_without_images.replace("", "") text_tokens = len(self.tokenizer.encode(text_without_images, add_special_tokens=False)) else: text_tokens = 0 image_budget = self.tokenizer.model_max_length - text_tokens max_num = (image_budget // num_image - 2) // self.image_seq_length - 1 max_num = max(1, min(self.max_dynamic_patch, max_num)) for image in images: image = image.convert("RGB") patches = dynamic_preprocess( image, min_num=self.min_dynamic_patch, max_num=max_num, image_size=image_size, use_thumbnail=self.use_thumbnail, ) num_patches_list.append(len(patches)) pixel_values = self.image_processor( images=patches, return_tensors="pt" ).pixel_values all_pixel_values.append(pixel_values) data["pixel_values"] = torch.cat(all_pixel_values, dim=0) if text is not None: if isinstance(text, str): text = [text] expanded_texts = [] for t in text: for num_patches in num_patches_list: image_tokens = ( "<|image_start|>" + "<|image_pad|>" * self.image_seq_length * num_patches + "<|image_end|>" ) t = t.replace("", image_tokens, 1) expanded_texts.append(t) tokenized = self.tokenizer( expanded_texts if len(expanded_texts) > 1 else expanded_texts[0], return_tensors=return_tensors, add_special_tokens=False, **kwargs, ) data.update(tokenized) if num_patches_list: data["num_patches_list"] = num_patches_list return BatchFeature(data=data, tensor_type=return_tensors) def apply_chat_template( self, messages, tokenize=False, add_generation_prompt=False, return_dict=False, return_tensors=None, **kwargs, ): """Format messages and optionally process images + tokenize in one call. Supports structured content messages (Qwen3-VL style):: messages = [{"role": "user", "content": [ {"type": "image", "image": "path/to/img.png"}, {"type": "text", "text": "Describe this image."}, ]}] Plain string content is also supported:: messages = [{"role": "user", "content": "Hello"}] When ``tokenize=True`` and ``return_dict=True``, returns a :class:`~transformers.BatchFeature` with ``pixel_values``, ``input_ids``, and ``attention_mask`` that can be unpacked directly into ``model.generate(**inputs)``. """ # Extract images and flatten structured content to plain text messages images = [] flat_messages = [] for msg in messages: role = msg["role"] content = msg["content"] if isinstance(content, str): flat_messages.append({"role": role, "content": content}) elif isinstance(content, list): text_parts = [] for item in content: if item["type"] == "image": img = item["image"] if isinstance(img, str): images.append(Image.open(img).convert("RGB")) elif isinstance(img, Image.Image): images.append(img.convert("RGB")) text_parts.append("") elif item["type"] == "text": text_parts.append(item["text"]) flat_messages.append({"role": role, "content": "".join(text_parts)}) text = self.tokenizer.apply_chat_template( flat_messages, tokenize=False, add_special_tokens=False, add_generation_prompt=add_generation_prompt, ) text += "<|channel|>final<|message|>" if not tokenize: return text result = self( images=images if images else None, text=text, return_tensors=return_tensors, **kwargs, ) # Remove non-tensor metadata so **result works with model.generate() result.pop("num_patches_list", None) if return_dict: return result return result["input_ids"] def decode(self, token_ids, **kwargs): return self.tokenizer.decode(token_ids, **kwargs) def batch_decode(self, token_ids, **kwargs): return self.tokenizer.batch_decode(token_ids, **kwargs) @property def model_input_names(self): tokenizer_names = self.tokenizer.model_input_names image_processor_names = self.image_processor.model_input_names return list(dict.fromkeys(tokenizer_names + image_processor_names))