Jagle-VL-2.2B-FineVision / processing_llmjpvl.py
speed's picture
Upload processing_llmjpvl.py with huggingface_hub
e4ccf48 verified
"""LLM-jp-VL Processor — combines SigLIP image processing + dynamic patching + tokenization."""
from typing import List, Optional, Union
import torch
from PIL import Image
from transformers import BatchFeature, ProcessorMixin
def find_closest_aspect_ratio(aspect_ratio, target_ratios, width, height, image_size):
best_ratio_diff = float("inf")
best_ratio = (1, 1)
area = width * height
for ratio in target_ratios:
target_aspect_ratio = ratio[0] / ratio[1]
ratio_diff = abs(aspect_ratio - target_aspect_ratio)
if ratio_diff < best_ratio_diff:
best_ratio_diff = ratio_diff
best_ratio = ratio
elif ratio_diff == best_ratio_diff:
if area > 0.5 * image_size * image_size * ratio[0] * ratio[1]:
best_ratio = ratio
return best_ratio
def dynamic_preprocess(
image, min_num=1, max_num=12, image_size=512, use_thumbnail=False
):
orig_width, orig_height = image.size
aspect_ratio = orig_width / orig_height
target_ratios = set(
(i, j)
for n in range(min_num, max_num + 1)
for i in range(1, n + 1)
for j in range(1, n + 1)
if i * j <= max_num and i * j >= min_num
)
target_ratios = sorted(target_ratios, key=lambda x: x[0] * x[1])
target_aspect_ratio = find_closest_aspect_ratio(
aspect_ratio, target_ratios, orig_width, orig_height, image_size
)
target_width = image_size * target_aspect_ratio[0]
target_height = image_size * target_aspect_ratio[1]
blocks = target_aspect_ratio[0] * target_aspect_ratio[1]
resized_img = image.resize((target_width, target_height))
processed_images = []
for i in range(blocks):
box = (
(i % (target_width // image_size)) * image_size,
(i // (target_width // image_size)) * image_size,
((i % (target_width // image_size)) + 1) * image_size,
((i // (target_width // image_size)) + 1) * image_size,
)
processed_images.append(resized_img.crop(box))
if use_thumbnail and len(processed_images) != 1:
processed_images.append(image.resize((image_size, image_size)))
return processed_images
class LLMjpVLProcessor(ProcessorMixin):
attributes = ["image_processor", "tokenizer"]
image_processor_class = "AutoImageProcessor"
tokenizer_class = "AutoTokenizer"
def __init__(
self,
image_processor,
tokenizer,
image_seq_length=256,
max_dynamic_patch=12,
min_dynamic_patch=1,
use_thumbnail=True,
chat_template=None,
**kwargs,
):
self.image_seq_length = image_seq_length
self.max_dynamic_patch = max_dynamic_patch
self.min_dynamic_patch = min_dynamic_patch
self.use_thumbnail = use_thumbnail
if chat_template is not None:
tokenizer.chat_template = chat_template
super().__init__(image_processor, tokenizer, **kwargs)
def __call__(
self,
images: Optional[Union[Image.Image, List[Image.Image]]] = None,
text: Optional[Union[str, List[str]]] = None,
return_tensors: Optional[str] = None,
**kwargs,
) -> BatchFeature:
if text is None and images is None:
raise ValueError("You must provide at least one of `text` or `images`.")
data = {}
num_patches_list = []
if images is not None:
if isinstance(images, Image.Image):
images = [images]
image_size = self.image_processor.size.get(
"height", self.image_processor.size.get("shortest_edge", 512)
)
all_pixel_values = []
num_image = len(images)
# Compute max patches per image from actual text token count.
# Each image uses (max_num + 1) * image_seq_length + 2 tokens (thumbnail added when max_num > 1).
if text is not None:
text_without_images = text if isinstance(text, str) else text[0]
text_without_images = text_without_images.replace("<image>", "")
text_tokens = len(self.tokenizer.encode(text_without_images, add_special_tokens=False))
else:
text_tokens = 0
image_budget = self.tokenizer.model_max_length - text_tokens
max_num = (image_budget // num_image - 2) // self.image_seq_length - 1
max_num = max(1, min(self.max_dynamic_patch, max_num))
for image in images:
image = image.convert("RGB")
patches = dynamic_preprocess(
image,
min_num=self.min_dynamic_patch,
max_num=max_num,
image_size=image_size,
use_thumbnail=self.use_thumbnail,
)
num_patches_list.append(len(patches))
pixel_values = self.image_processor(
images=patches, return_tensors="pt"
).pixel_values
all_pixel_values.append(pixel_values)
data["pixel_values"] = torch.cat(all_pixel_values, dim=0)
if text is not None:
if isinstance(text, str):
text = [text]
expanded_texts = []
for t in text:
for num_patches in num_patches_list:
image_tokens = (
"<|image_start|>"
+ "<|image_pad|>" * self.image_seq_length * num_patches
+ "<|image_end|>"
)
t = t.replace("<image>", image_tokens, 1)
expanded_texts.append(t)
tokenized = self.tokenizer(
expanded_texts if len(expanded_texts) > 1 else expanded_texts[0],
return_tensors=return_tensors,
add_special_tokens=False,
**kwargs,
)
data.update(tokenized)
if num_patches_list:
data["num_patches_list"] = num_patches_list
return BatchFeature(data=data, tensor_type=return_tensors)
def apply_chat_template(
self,
messages,
tokenize=False,
add_generation_prompt=False,
return_dict=False,
return_tensors=None,
**kwargs,
):
"""Format messages and optionally process images + tokenize in one call.
Supports structured content messages (Qwen3-VL style)::
messages = [{"role": "user", "content": [
{"type": "image", "image": "path/to/img.png"},
{"type": "text", "text": "Describe this image."},
]}]
Plain string content is also supported::
messages = [{"role": "user", "content": "Hello"}]
When ``tokenize=True`` and ``return_dict=True``, returns a
:class:`~transformers.BatchFeature` with ``pixel_values``,
``input_ids``, and ``attention_mask`` that can be unpacked directly
into ``model.generate(**inputs)``.
"""
# Extract images and flatten structured content to plain text messages
images = []
flat_messages = []
for msg in messages:
role = msg["role"]
content = msg["content"]
if isinstance(content, str):
flat_messages.append({"role": role, "content": content})
elif isinstance(content, list):
text_parts = []
for item in content:
if item["type"] == "image":
img = item["image"]
if isinstance(img, str):
images.append(Image.open(img).convert("RGB"))
elif isinstance(img, Image.Image):
images.append(img.convert("RGB"))
text_parts.append("<image>")
elif item["type"] == "text":
text_parts.append(item["text"])
flat_messages.append({"role": role, "content": "".join(text_parts)})
text = self.tokenizer.apply_chat_template(
flat_messages,
tokenize=False,
add_special_tokens=False,
add_generation_prompt=add_generation_prompt,
)
text += "<|channel|>final<|message|>"
if not tokenize:
return text
result = self(
images=images if images else None,
text=text,
return_tensors=return_tensors,
**kwargs,
)
# Remove non-tensor metadata so **result works with model.generate()
result.pop("num_patches_list", None)
if return_dict:
return result
return result["input_ids"]
def decode(self, token_ids, **kwargs):
return self.tokenizer.decode(token_ids, **kwargs)
def batch_decode(self, token_ids, **kwargs):
return self.tokenizer.batch_decode(token_ids, **kwargs)
@property
def model_input_names(self):
tokenizer_names = self.tokenizer.model_input_names
image_processor_names = self.image_processor.model_input_names
return list(dict.fromkeys(tokenizer_names + image_processor_names))