|
|
""" |
|
|
Processor for Gemma3Tiled that handles tokenization with tiled images. |
|
|
|
|
|
This processor generates the correct number of image placeholder tokens |
|
|
based on the tile grid dimensions. |
|
|
""" |
|
|
|
|
|
import re |
|
|
from typing import Optional, Union |
|
|
|
|
|
import torch |
|
|
import numpy as np |
|
|
|
|
|
from transformers.feature_extraction_utils import BatchFeature |
|
|
from transformers.image_utils import ImageInput, make_nested_list_of_images |
|
|
from transformers.processing_utils import ProcessingKwargs, ProcessorMixin, Unpack, ImagesKwargs |
|
|
from transformers.tokenization_utils_base import PreTokenizedInput, TextInput |
|
|
|
|
|
|
|
|
class Gemma3TiledImagesKwargs(ImagesKwargs): |
|
|
tile_size: Optional[int] |
|
|
max_tiles_h: Optional[int] |
|
|
max_tiles_w: Optional[int] |
|
|
min_tiles: Optional[int] |
|
|
do_convert_rgb: Optional[bool] |
|
|
|
|
|
|
|
|
class Gemma3TiledProcessorKwargs(ProcessingKwargs, total=False): |
|
|
images_kwargs: Gemma3TiledImagesKwargs |
|
|
_defaults = { |
|
|
"text_kwargs": { |
|
|
"padding": False, |
|
|
"return_mm_token_type_ids": True, |
|
|
}, |
|
|
"images_kwargs": { |
|
|
"do_convert_rgb": True, |
|
|
"tile_size": 896, |
|
|
"max_tiles_h": 4, |
|
|
"max_tiles_w": 4, |
|
|
"min_tiles": 1, |
|
|
}, |
|
|
} |
|
|
|
|
|
|
|
|
class Gemma3TiledProcessor(ProcessorMixin): |
|
|
""" |
|
|
Processor for Gemma3Tiled that handles tokenization with tiled images. |
|
|
|
|
|
The key difference from Gemma3Processor is that instead of a fixed |
|
|
256 tokens per image, we generate (grid_h * 16) * (grid_w * 16) + (grid_h * 16 - 1) |
|
|
tokens per image, where the extra tokens are for linebreak embeddings. |
|
|
""" |
|
|
|
|
|
attributes = ["image_processor", "tokenizer"] |
|
|
image_processor_class = "AutoImageProcessor" |
|
|
tokenizer_class = "AutoTokenizer" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
image_processor, |
|
|
tokenizer, |
|
|
chat_template=None, |
|
|
tokens_per_tile: int = 256, |
|
|
**kwargs, |
|
|
): |
|
|
self.tokens_per_tile = tokens_per_tile |
|
|
self.tokens_per_tile_side = int(tokens_per_tile ** 0.5) |
|
|
|
|
|
self.image_token_id = tokenizer.image_token_id |
|
|
self.boi_token = tokenizer.boi_token |
|
|
self.eoi_token = getattr(tokenizer, 'eoi_token', '</image>') |
|
|
self.image_token = tokenizer.image_token |
|
|
|
|
|
super().__init__( |
|
|
image_processor=image_processor, |
|
|
tokenizer=tokenizer, |
|
|
chat_template=chat_template, |
|
|
**kwargs, |
|
|
) |
|
|
|
|
|
def get_num_image_tokens(self, grid_h: int, grid_w: int) -> int: |
|
|
""" |
|
|
Calculate total image tokens needed for a tile grid. |
|
|
|
|
|
For a grid_h x grid_w grid of tiles: |
|
|
- Image tokens: (grid_h * 16) * (grid_w * 16) = grid_h * grid_w * 256 |
|
|
- Linebreak tokens: (grid_h * 16 - 1) = one after each row except the last |
|
|
|
|
|
Total = grid_h * grid_w * 256 + grid_h * 16 - 1 |
|
|
""" |
|
|
rows = grid_h * self.tokens_per_tile_side |
|
|
cols = grid_w * self.tokens_per_tile_side |
|
|
|
|
|
img_tokens = rows * cols |
|
|
linebreak_tokens = rows - 1 |
|
|
|
|
|
return img_tokens + linebreak_tokens |
|
|
|
|
|
def build_image_token_sequence(self, grid_h: int, grid_w: int) -> str: |
|
|
""" |
|
|
Build the image token sequence for a tiled image. |
|
|
|
|
|
Returns a string like: |
|
|
\n\n<boi><img>×(16*grid_w)<img>×(16*grid_w)...(×16*grid_h rows)...<eoi>\n\n |
|
|
|
|
|
Note: We use <img> tokens for BOTH actual image positions AND linebreak positions. |
|
|
The model will replace them with the appropriate embeddings. |
|
|
""" |
|
|
rows = grid_h * self.tokens_per_tile_side |
|
|
cols = grid_w * self.tokens_per_tile_side |
|
|
|
|
|
total_tokens = self.get_num_image_tokens(grid_h, grid_w) |
|
|
image_tokens = self.image_token * total_tokens |
|
|
|
|
|
return f"\n\n{self.boi_token}{image_tokens}{self.eoi_token}\n\n" |
|
|
|
|
|
def __call__( |
|
|
self, |
|
|
images: Optional[ImageInput] = None, |
|
|
text: Union[TextInput, PreTokenizedInput, list[TextInput], list[PreTokenizedInput]] = None, |
|
|
videos=None, |
|
|
audio=None, |
|
|
**kwargs: Unpack[Gemma3TiledProcessorKwargs], |
|
|
) -> BatchFeature: |
|
|
if text is None and images is None: |
|
|
raise ValueError("Provide at least one of `text` or `images`.") |
|
|
|
|
|
output_kwargs = self._merge_kwargs( |
|
|
Gemma3TiledProcessorKwargs, |
|
|
tokenizer_init_kwargs=self.tokenizer.init_kwargs, |
|
|
**kwargs, |
|
|
) |
|
|
|
|
|
if isinstance(text, str): |
|
|
text = [text] |
|
|
elif not isinstance(text, list) and not isinstance(text[0], str): |
|
|
raise TypeError("Invalid input text. Please provide a string, or a list of strings") |
|
|
|
|
|
image_inputs = {} |
|
|
if images is not None: |
|
|
|
|
|
images_fetched = self.image_processor.fetch_images(images) if hasattr(self.image_processor, 'fetch_images') else images |
|
|
batched_images = make_nested_list_of_images(images_fetched) |
|
|
|
|
|
|
|
|
image_inputs = self.image_processor(images_fetched, **output_kwargs["images_kwargs"]) |
|
|
|
|
|
|
|
|
tile_grid_shapes = list(image_inputs.get("tile_grid_shape", [])) |
|
|
|
|
|
|
|
|
if not text: |
|
|
text = [" ".join([self.boi_token] * len(imgs)) for imgs in batched_images] |
|
|
|
|
|
if len(batched_images) != len(text): |
|
|
raise ValueError( |
|
|
f"Received inconsistently sized batches of images ({len(batched_images)}) and text ({len(text)})." |
|
|
) |
|
|
|
|
|
|
|
|
all_grid_shapes = [] |
|
|
grid_shape_iter = iter(tile_grid_shapes) |
|
|
for imgs in batched_images: |
|
|
for _ in imgs: |
|
|
try: |
|
|
all_grid_shapes.append(next(grid_shape_iter)) |
|
|
except StopIteration: |
|
|
|
|
|
all_grid_shapes.append((1, 1)) |
|
|
|
|
|
|
|
|
grid_shape_idx = 0 |
|
|
for batch_idx, (prompt, imgs) in enumerate(zip(text, batched_images)): |
|
|
image_indexes = [m.start() for m in re.finditer(re.escape(self.boi_token), prompt)] |
|
|
|
|
|
if len(imgs) != len(image_indexes): |
|
|
raise ValueError( |
|
|
f"Prompt contained {len(image_indexes)} image tokens but received {len(imgs)} images." |
|
|
) |
|
|
|
|
|
|
|
|
batch_grid_shapes = all_grid_shapes[grid_shape_idx:grid_shape_idx + len(imgs)] |
|
|
grid_shape_idx += len(imgs) |
|
|
|
|
|
|
|
|
|
|
|
for idx, (grid_h, grid_w) in zip(reversed(image_indexes), reversed(batch_grid_shapes)): |
|
|
image_sequence = self.build_image_token_sequence(grid_h, grid_w) |
|
|
prompt = prompt[:idx] + image_sequence + prompt[idx + len(self.boi_token):] |
|
|
|
|
|
text[batch_idx] = prompt |
|
|
|
|
|
return_tensors = output_kwargs["text_kwargs"].pop("return_tensors", None) |
|
|
return_mm_token_type_ids = output_kwargs["text_kwargs"].pop("return_mm_token_type_ids", False) |
|
|
|
|
|
|
|
|
text_inputs = self.tokenizer(text=text, return_tensors=return_tensors, **output_kwargs["text_kwargs"]) |
|
|
|
|
|
|
|
|
if return_mm_token_type_ids: |
|
|
if return_tensors == "pt": |
|
|
input_ids = text_inputs["input_ids"] |
|
|
mm_token_type_ids = torch.zeros_like(input_ids) |
|
|
mm_token_type_ids[input_ids == self.image_token_id] = 1 |
|
|
text_inputs["token_type_ids"] = mm_token_type_ids |
|
|
else: |
|
|
array_ids = np.array(text_inputs["input_ids"]) |
|
|
mm_token_type_ids = np.zeros_like(array_ids) |
|
|
mm_token_type_ids[array_ids == self.image_token_id] = 1 |
|
|
text_inputs["token_type_ids"] = mm_token_type_ids.tolist() |
|
|
|
|
|
|
|
|
|
|
|
return BatchFeature(data={**text_inputs, **image_inputs}) |
|
|
|
|
|
@property |
|
|
def model_input_names(self): |
|
|
tokenizer_input_names = self.tokenizer.model_input_names + ["token_type_ids"] |
|
|
image_processor_input_names = self.image_processor.model_input_names |
|
|
return list(set(tokenizer_input_names + image_processor_input_names)) |
|
|
|
|
|
|
|
|
__all__ = ["Gemma3TiledProcessor", "Gemma3TiledProcessorKwargs"] |
|
|
|