| import numpy as np |
| from PIL import Image |
| from typing import List, Optional, Union, Dict, Any |
| import torch |
| from torchvision import transforms as T |
| import albumentations as A |
| import cv2 |
| import json |
|
|
| from transformers import ProcessorMixin, BaseImageProcessor, ImageProcessingMixin |
| from transformers.tokenization_utils_base import BatchEncoding |
| from transformers.image_utils import ChannelDimension, ImageInput, PILImageResampling, infer_channel_dimension_format |
| from transformers.utils import TensorType |
|
|
|
|
| class NemotronParseImageProcessor(BaseImageProcessor, ImageProcessingMixin): |
| """ |
| Image processor for NemotronParse model. |
| |
| This processor inherits from BaseImageProcessor to be compatible with transformers AutoImageProcessor. |
| """ |
| |
| model_input_names = ["pixel_values"] |
| |
| def __init__( |
| self, |
| final_size: tuple = (2048, 1664), |
| **kwargs, |
| ): |
| clean_kwargs = {} |
| for k, v in kwargs.items(): |
| if not k.startswith('_') and k not in ['transform', 'torch_transform']: |
| clean_kwargs[k] = v |
| |
| if 'size' in clean_kwargs: |
| size_config = clean_kwargs.pop('size') |
| if isinstance(size_config, dict): |
| if 'longest_edge' in size_config: |
| longest_edge = size_config['longest_edge'] |
| if isinstance(longest_edge, (list, tuple)): |
| final_size = tuple(int(x) for x in longest_edge) |
| else: |
| final_size = (int(longest_edge), int(longest_edge)) |
| elif 'height' in size_config and 'width' in size_config: |
| final_size = (int(size_config['height']), int(size_config['width'])) |
| |
| super().__init__(**clean_kwargs) |
| |
| if isinstance(final_size, (list, tuple)) and len(final_size) >= 2: |
| self.final_size = (int(final_size[0]), int(final_size[1])) |
| elif isinstance(final_size, (int, float)): |
| self.final_size = (int(final_size), int(final_size)) |
| else: |
| self.final_size = (2048, 1664) |
| |
| self._create_transforms() |
| |
| def _create_transforms(self): |
| """Create transform objects (not serialized to JSON).""" |
| if isinstance(self.final_size, (list, tuple)): |
| self.target_height, self.target_width = int(self.final_size[0]), int(self.final_size[1]) |
| else: |
| self.target_height = self.target_width = int(self.final_size) |
| |
| self.transform = A.Compose([ |
| A.PadIfNeeded( |
| min_height=self.target_height, |
| min_width=self.target_width, |
| border_mode=cv2.BORDER_CONSTANT, |
| value=[255, 255, 255], |
| p=1.0 |
| ), |
| ]) |
| |
| self.torch_transform = T.Compose([ |
| T.ToTensor(), |
| |
| ]) |
|
|
| def to_dict(self): |
| """Override to exclude non-serializable transforms.""" |
| output = super().to_dict() |
| output.pop('transform', None) |
| output.pop('torch_transform', None) |
| return output |
| |
| @classmethod |
| def from_dict(cls, config_dict: dict, **kwargs): |
| """Override to recreate transforms after loading.""" |
| config_dict = config_dict.copy() |
| config_dict.pop('transform', None) |
| config_dict.pop('torch_transform', None) |
| |
| |
| for key in list(config_dict.keys()): |
| if key.startswith('_') or config_dict[key] is None: |
| config_dict.pop(key, None) |
| |
| |
| if 'final_size' in config_dict: |
| final_size = config_dict['final_size'] |
| if isinstance(final_size, (list, tuple)): |
| config_dict['final_size'] = tuple(int(x) for x in final_size) |
| |
| try: |
| return cls(**config_dict, **kwargs) |
| except Exception as e: |
| print(f"Warning: Error in from_dict: {e}") |
| print("Using default parameters...") |
| return cls(**kwargs) |
| |
| def save_pretrained(self, save_directory, **kwargs): |
| """Save image processor configuration.""" |
| import os |
| import json |
| |
| os.makedirs(save_directory, exist_ok=True) |
| |
| |
| config = { |
| "feature_extractor_type": "NemotronParseImageProcessor", |
| "image_processor_type": "NemotronParseImageProcessor", |
| "processor_class": "NemotronParseImageProcessor", |
| "size": { |
| "height": self.final_size[0], |
| "width": self.final_size[1], |
| "longest_edge": self.final_size |
| }, |
| "final_size": self.final_size, |
| } |
| |
| config_path = os.path.join(save_directory, "preprocessor_config.json") |
| with open(config_path, 'w') as f: |
| json.dump(config, f, indent=2) |
|
|
| def _resize_with_aspect_ratio(self, image: np.ndarray) -> np.ndarray: |
| """Resize image maintaining aspect ratio (exact replica of original LongestMaxSizeHW).""" |
| height, width = image.shape[:2] |
| max_size_height = self.target_height |
| max_size_width = self.target_width |
| |
| |
| aspect_ratio = width / height |
| new_height = height |
| new_width = width |
|
|
| if height > max_size_height: |
| new_height = max_size_height |
| new_width = int(new_height * aspect_ratio) |
|
|
| if new_width > max_size_width: |
| new_width = max_size_width |
| new_height = int(new_width / aspect_ratio) |
| |
| return cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_LINEAR) |
| |
| def _pad_to_size(self, image: np.ndarray) -> np.ndarray: |
| """Pad image to target size with white padding (matches A.PadIfNeeded behavior).""" |
| h, w = image.shape[:2] |
| min_height, min_width = self.target_height, self.target_width |
| |
| pad_h = max(0, min_height - h) |
| pad_w = max(0, min_width - w) |
| |
| if pad_h == 0 and pad_w == 0: |
| return image |
| |
| if len(image.shape) == 3: |
| padded = np.pad( |
| image, |
| ((0, pad_h), (0, pad_w), (0, 0)), |
| mode='constant', |
| constant_values=255 |
| ) |
| else: |
| padded = np.pad( |
| image, |
| ((0, pad_h), (0, pad_w)), |
| mode='constant', |
| constant_values=255 |
| ) |
| |
| return padded |
| |
| def preprocess( |
| self, |
| images: ImageInput, |
| return_tensors: Optional[Union[str, TensorType]] = None, |
| **kwargs, |
| ) -> Dict[str, torch.Tensor]: |
| """ |
| Preprocess an image or batch of images for the NemotronParse model. |
| |
| Args: |
| images: Input image(s) |
| return_tensors: Type of tensors to return |
| """ |
| |
| |
| if not isinstance(images, list): |
| images = [images] |
|
|
| |
| for i in range(len(images)): |
| images[i] = images[i].convert('RGB') |
| |
| |
| processed_images = [] |
| for image in images: |
| if isinstance(image, Image.Image): |
| image = np.asarray(image) |
| processed_images.append(image) |
| |
| |
| pixel_values = [] |
| for image in processed_images: |
| processed_image = self._resize_with_aspect_ratio(image) |
| |
| if self.transform is not None: |
| transformed = self.transform(image=processed_image) |
| processed_image = transformed["image"] |
| else: |
| |
| processed_image = self._pad_to_size(processed_image) |
| |
| pixel_values_tensor = self.torch_transform(processed_image) |
| |
| if pixel_values_tensor.shape[0] == 1: |
| pixel_values_tensor = pixel_values_tensor.expand(3, -1, -1) |
| |
| pixel_values.append(pixel_values_tensor) |
| |
| pixel_values = torch.stack(pixel_values) |
| |
| data = {"pixel_values": pixel_values} |
| |
| if return_tensors is not None: |
| data = self._convert_output_format(data, return_tensors) |
| |
| return data |
| |
| def _convert_output_format(self, data: Dict[str, torch.Tensor], return_tensors: Union[str, TensorType]) -> Dict: |
| """Convert output format based on return_tensors parameter.""" |
| if return_tensors == "pt" or return_tensors == TensorType.PYTORCH: |
| return data |
| elif return_tensors == "np" or return_tensors == TensorType.NUMPY: |
| return {k: v.numpy() for k, v in data.items()} |
| else: |
| return data |
| |
| def __call__(self, images: Union[Image.Image, List[Image.Image]], **kwargs) -> Dict[str, torch.Tensor]: |
| """Process images for the model (backward compatibility).""" |
| return self.preprocess(images, **kwargs) |
|
|
|
|
| class NemotronParseProcessor(ProcessorMixin): |
| |
| attributes = ["image_processor", "tokenizer"] |
| image_processor_class = "AutoImageProcessor" |
| tokenizer_class = ("PreTrainedTokenizer", "PreTrainedTokenizerFast") |
| |
| def __init__(self, image_processor=None, tokenizer=None, **kwargs): |
| if image_processor is None: |
| image_processor = NemotronParseImageProcessor(**kwargs) |
| |
| super().__init__(image_processor, tokenizer) |
| |
| |
| def __call__( |
| self, |
| images: Union[Image.Image, List[Image.Image]] = None, |
| text: Union[str, List[str]] = None, |
| add_special_tokens: bool = True, |
| padding: Union[bool, str] = False, |
| truncation: Union[bool, str] = False, |
| max_length: Optional[int] = None, |
| stride: int = 0, |
| pad_to_multiple_of: Optional[int] = None, |
| return_attention_mask: Optional[bool] = None, |
| return_overflowing_tokens: bool = False, |
| return_special_tokens_mask: bool = False, |
| return_offsets_mapping: bool = False, |
| return_token_type_ids: bool = False, |
| return_length: bool = False, |
| verbose: bool = True, |
| return_tensors: Optional[Union[str, "TensorType"]] = None, |
| **kwargs |
| ) -> BatchEncoding: |
| """ |
| Main method to prepare for the model one or several text(s) and image(s). |
| """ |
| |
| |
| if images is not None: |
| image_inputs = self.image_processor(images, **kwargs) |
| else: |
| image_inputs = {} |
| |
| |
| if text is not None: |
| text_inputs = self.tokenizer( |
| text, |
| add_special_tokens=add_special_tokens, |
| padding=padding, |
| truncation=truncation, |
| max_length=max_length, |
| stride=stride, |
| pad_to_multiple_of=pad_to_multiple_of, |
| return_attention_mask=return_attention_mask, |
| return_overflowing_tokens=return_overflowing_tokens, |
| return_special_tokens_mask=return_special_tokens_mask, |
| return_offsets_mapping=return_offsets_mapping, |
| return_token_type_ids=return_token_type_ids, |
| return_length=return_length, |
| verbose=verbose, |
| return_tensors=return_tensors, |
| **kwargs, |
| ) |
| else: |
| text_inputs = {} |
| |
| |
| return BatchEncoding({**image_inputs, **text_inputs}) |
| |
| def decode(self, *args, **kwargs): |
| """Decode token ids to strings.""" |
| return self.tokenizer.decode(*args, **kwargs) |
| |
| def batch_decode(self, *args, **kwargs): |
| """Batch decode token ids to strings.""" |
| return self.tokenizer.batch_decode(*args, **kwargs) |
| |
| def post_process_generation(self, sequences, fix_markdown=False): |
| """Post-process generated sequences.""" |
| if hasattr(self.tokenizer, 'post_process_generation'): |
| return self.tokenizer.post_process_generation(sequences, fix_markdown=fix_markdown) |
| else: |
| |
| if isinstance(sequences, str): |
| sequences = [sequences] |
| |
| processed = [] |
| for seq in sequences: |
| |
| seq = seq.replace('<s>', '').replace('</s>', '').strip() |
| processed.append(seq) |
| |
| return processed[0] if len(processed) == 1 else processed |
| |
| @classmethod |
| def from_pretrained(cls, pretrained_model_name_or_path, **kwargs): |
| """ |
| Load processor from pretrained model. |
| |
| This method is compatible with AutoProcessor.from_pretrained(). |
| """ |
| |
| from transformers import AutoImageProcessor, AutoTokenizer |
| trust_remote_code = kwargs.get("trust_remote_code", None) |
| revision = kwargs.get("revision", None) |
| token = kwargs.get("token", None) |
| image_processor = AutoImageProcessor.from_pretrained( |
| pretrained_model_name_or_path, |
| trust_remote_code=trust_remote_code, |
| revision=revision, |
| token=token, |
| ) |
| tokenizer = AutoTokenizer.from_pretrained( |
| pretrained_model_name_or_path, |
| trust_remote_code=trust_remote_code, |
| revision=revision, |
| token=token, |
| ) |
| return cls(image_processor=image_processor, tokenizer=tokenizer) |
| |
| def save_pretrained(self, save_directory, **kwargs): |
| """ |
| Save processor to directory. |
| |
| This method is compatible with AutoProcessor/AutoImageProcessor loading. |
| """ |
| import os |
| os.makedirs(save_directory, exist_ok=True) |
| |
| |
| print("Saving tokenizer for AutoTokenizer compatibility...") |
| self.tokenizer.save_pretrained(save_directory, **kwargs) |
| |
| |
| print("Saving image processor...") |
| self.image_processor.save_pretrained(save_directory, **kwargs) |
| |
| |
| super().save_pretrained(save_directory, **kwargs) |
| print(f"NemotronParseProcessor saved to {save_directory}") |
| print(f"AutoTokenizer.from_pretrained('{save_directory}') should now work!") |
|
|