|
|
""" |
|
|
Oculus Processor |
|
|
|
|
|
Handles image and text preprocessing for the Oculus model. |
|
|
""" |
|
|
|
|
|
from typing import Optional, Union, List, Dict, Any |
|
|
from PIL import Image |
|
|
import numpy as np |
|
|
|
|
|
from transformers import ProcessorMixin, BatchFeature |
|
|
from transformers.image_utils import ImageInput |
|
|
|
|
|
|
|
|
class OculusProcessor(ProcessorMixin): |
|
|
""" |
|
|
Processor for Oculus model. |
|
|
|
|
|
Combines image processing and text tokenization. |
|
|
|
|
|
Usage: |
|
|
```python |
|
|
processor = OculusProcessor.from_pretrained("OceanirAI/oculus-0.2") |
|
|
|
|
|
# Process inputs |
|
|
inputs = processor( |
|
|
images=image, |
|
|
text="What is in this image?", |
|
|
mode="text", |
|
|
return_tensors="pt" |
|
|
) |
|
|
``` |
|
|
""" |
|
|
|
|
|
attributes = ["image_processor", "tokenizer"] |
|
|
image_processor_class = "AutoImageProcessor" |
|
|
tokenizer_class = "AutoTokenizer" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
image_processor=None, |
|
|
tokenizer=None, |
|
|
**kwargs |
|
|
): |
|
|
super().__init__(image_processor, tokenizer) |
|
|
self.image_processor = image_processor |
|
|
self.tokenizer = tokenizer |
|
|
|
|
|
|
|
|
self.thinking_token = kwargs.get("thinking_token", "<think>") |
|
|
self.thinking_end_token = kwargs.get("thinking_end_token", "</think>") |
|
|
self.focus_token = kwargs.get("focus_token", "<focus>") |
|
|
self.focus_end_token = kwargs.get("focus_end_token", "</focus>") |
|
|
|
|
|
|
|
|
self.mode_tokens = { |
|
|
"text": "<text>", |
|
|
"point": "<point>", |
|
|
"box": "<box>", |
|
|
"polygon": "<polygon>", |
|
|
} |
|
|
|
|
|
def __call__( |
|
|
self, |
|
|
images: ImageInput = None, |
|
|
text: Union[str, List[str]] = None, |
|
|
mode: str = "text", |
|
|
think: bool = False, |
|
|
return_tensors: Optional[str] = None, |
|
|
**kwargs |
|
|
) -> BatchFeature: |
|
|
""" |
|
|
Process images and text for Oculus model. |
|
|
|
|
|
Args: |
|
|
images: Input image(s) |
|
|
text: Input text prompt(s) |
|
|
mode: Output mode ("text", "point", "box", "polygon") |
|
|
think: Enable reasoning mode |
|
|
return_tensors: Tensor format ("pt", "np", etc.) |
|
|
|
|
|
Returns: |
|
|
BatchFeature with processed inputs |
|
|
""" |
|
|
|
|
|
if images is not None: |
|
|
if self.image_processor is not None: |
|
|
image_features = self.image_processor(images, return_tensors=return_tensors) |
|
|
else: |
|
|
|
|
|
if isinstance(images, Image.Image): |
|
|
images = [images] |
|
|
image_features = {"pixel_values": images} |
|
|
else: |
|
|
image_features = {} |
|
|
|
|
|
|
|
|
if text is not None: |
|
|
|
|
|
processed_text = self._format_prompt(text, mode, think) |
|
|
|
|
|
if self.tokenizer is not None: |
|
|
text_features = self.tokenizer( |
|
|
processed_text, |
|
|
return_tensors=return_tensors, |
|
|
padding=True, |
|
|
truncation=True, |
|
|
**kwargs |
|
|
) |
|
|
else: |
|
|
text_features = {"text": processed_text} |
|
|
else: |
|
|
text_features = {} |
|
|
|
|
|
|
|
|
return BatchFeature( |
|
|
data={ |
|
|
**image_features, |
|
|
**text_features, |
|
|
"mode": mode, |
|
|
"think": think, |
|
|
}, |
|
|
tensor_type=return_tensors |
|
|
) |
|
|
|
|
|
def _format_prompt( |
|
|
self, |
|
|
text: Union[str, List[str]], |
|
|
mode: str, |
|
|
think: bool |
|
|
) -> Union[str, List[str]]: |
|
|
"""Format prompt with special tokens.""" |
|
|
|
|
|
def format_single(t: str) -> str: |
|
|
parts = [] |
|
|
|
|
|
|
|
|
if mode in self.mode_tokens: |
|
|
parts.append(self.mode_tokens[mode]) |
|
|
|
|
|
|
|
|
if think: |
|
|
parts.append(self.thinking_token) |
|
|
|
|
|
|
|
|
parts.append(t) |
|
|
|
|
|
return " ".join(parts) |
|
|
|
|
|
if isinstance(text, str): |
|
|
return format_single(text) |
|
|
else: |
|
|
return [format_single(t) for t in text] |
|
|
|
|
|
def decode( |
|
|
self, |
|
|
token_ids, |
|
|
skip_special_tokens: bool = True, |
|
|
**kwargs |
|
|
) -> str: |
|
|
"""Decode token IDs to text.""" |
|
|
if self.tokenizer is not None: |
|
|
text = self.tokenizer.decode(token_ids, skip_special_tokens=skip_special_tokens, **kwargs) |
|
|
else: |
|
|
text = str(token_ids) |
|
|
|
|
|
|
|
|
thinking_trace = None |
|
|
if self.thinking_token in text and self.thinking_end_token in text: |
|
|
start = text.find(self.thinking_token) + len(self.thinking_token) |
|
|
end = text.find(self.thinking_end_token) |
|
|
thinking_trace = text[start:end].strip() |
|
|
text = text[end + len(self.thinking_end_token):].strip() |
|
|
|
|
|
return text, thinking_trace |
|
|
|
|
|
def batch_decode( |
|
|
self, |
|
|
token_ids, |
|
|
skip_special_tokens: bool = True, |
|
|
**kwargs |
|
|
) -> List[str]: |
|
|
"""Decode batch of token IDs.""" |
|
|
return [ |
|
|
self.decode(ids, skip_special_tokens=skip_special_tokens, **kwargs) |
|
|
for ids in token_ids |
|
|
] |
|
|
|
|
|
@classmethod |
|
|
def from_pretrained(cls, pretrained_model_name_or_path: str, **kwargs): |
|
|
"""Load processor from pretrained.""" |
|
|
try: |
|
|
from transformers import AutoImageProcessor, AutoTokenizer |
|
|
|
|
|
image_processor = AutoImageProcessor.from_pretrained( |
|
|
pretrained_model_name_or_path, **kwargs |
|
|
) |
|
|
tokenizer = AutoTokenizer.from_pretrained( |
|
|
pretrained_model_name_or_path, **kwargs |
|
|
) |
|
|
return cls(image_processor=image_processor, tokenizer=tokenizer, **kwargs) |
|
|
except: |
|
|
|
|
|
return cls(**kwargs) |
|
|
|
|
|
def save_pretrained(self, save_directory: str, **kwargs): |
|
|
"""Save processor to directory.""" |
|
|
if self.image_processor is not None: |
|
|
self.image_processor.save_pretrained(save_directory) |
|
|
if self.tokenizer is not None: |
|
|
self.tokenizer.save_pretrained(save_directory) |
|
|
|