| from abc import ABC, abstractmethod |
| from typing import List, Dict, Union, Optional |
|
|
| from transformers import PretrainedConfig, AutoConfig, AutoModel |
| from .configuration_aimv2 import AIMv2Config |
| from .modeling_aimv2 import AIMv2Model |
|
|
| IGNORE_ID = -100 |
| IMAGE_TOKEN_ID = -200 |
| IMAGE_TOKEN = "<image>" |
| IMAGE_ATOM_ID = -300 |
| IMAGE_INDICATOR_IDS = [-301, -302, -303, -304, -305] |
|
|
| AutoConfig.register("aimv2", AIMv2Config) |
| AutoModel.register(AIMv2Config, AIMv2Model) |
|
|
| |
| |
| |
| class BaseVisualTokenizerConfig(PretrainedConfig): |
| def __init__( |
| self, |
| vocab_size=16384, |
| tokenize_function="softmax", |
| tau=1.0, |
| depths=None, |
| drop_cls_token=False, |
| backbone_config: Optional[Union[PretrainedConfig, dict]] = None, |
| hidden_stride: int = 1, |
| **kwargs |
| ): |
| super().__init__(**kwargs) |
| self.vocab_size = vocab_size |
| self.tokenize_function = tokenize_function |
| self.tau = tau |
| if isinstance(depths, str): |
| depths = [int(x) for x in depths.split('|')] |
| self.depths = depths |
| self.backbone_kwargs = {} |
| self.drop_cls_token = drop_cls_token |
| if backbone_config is not None: |
| assert isinstance(backbone_config, (PretrainedConfig, dict)), \ |
| f"expect `backbone_config` to be instance of PretrainedConfig or dict, but got {type(backbone_config)} type" |
| if not isinstance(backbone_config, PretrainedConfig): |
| model_type = backbone_config['model_type'] |
| backbone_config.pop('model_type') |
| backbone_config = AutoConfig.for_model(model_type, **backbone_config) |
| self.backbone_config = backbone_config |
| self.hidden_stride = hidden_stride |
|
|
|
|
| class Aimv2VisualTokenizerConfig(BaseVisualTokenizerConfig): |
| model_type = "aimv2_visual_tokenizer" |
|
|
| def __init__(self, **kwargs): |
| super().__init__(**kwargs) |
| if self.drop_cls_token: |
| self.drop_cls_token = False |
| if self.depths: |
| assert len(self.depths) == 1 |
| self.backbone_kwargs['num_hidden_layers'] = self.depths[0] |
|
|
|
|
| AutoConfig.register("aimv2_visual_tokenizer", Aimv2VisualTokenizerConfig) |
|
|
|
|
| |
| |
| |
| class OvisConfig(PretrainedConfig): |
| model_type = "ovis" |
|
|
| def __init__( |
| self, |
| llm_config: Optional[Union[PretrainedConfig, dict]] = None, |
| visual_tokenizer_config: Optional[Union[PretrainedConfig, dict]] = None, |
| multimodal_max_length=8192, |
| hidden_size=None, |
| conversation_formatter_class=None, |
| llm_attn_implementation=None, |
| disable_tie_weight=False, |
| **kwargs |
| ): |
| super().__init__(**kwargs) |
| if llm_config is not None: |
| assert isinstance(llm_config, (PretrainedConfig, dict)), \ |
| f"expect `llm_config` to be instance of PretrainedConfig or dict, but got {type(llm_config)} type" |
| if not isinstance(llm_config, PretrainedConfig): |
| model_type = llm_config['model_type'] |
| llm_config.pop('model_type') |
| llm_config = AutoConfig.for_model(model_type, **llm_config) |
| self.llm_config = llm_config |
| if visual_tokenizer_config is not None: |
| assert isinstance(visual_tokenizer_config, (PretrainedConfig, dict)), \ |
| f"expect `visual_tokenizer_config` to be instance of PretrainedConfig or dict, but got {type(visual_tokenizer_config)} type" |
| if not isinstance(visual_tokenizer_config, PretrainedConfig): |
| model_type = visual_tokenizer_config['model_type'] |
| visual_tokenizer_config.pop('model_type') |
| visual_tokenizer_config = AutoConfig.for_model(model_type, **visual_tokenizer_config) |
| self.visual_tokenizer_config = visual_tokenizer_config |
| self.multimodal_max_length = multimodal_max_length |
| self.hidden_size = hidden_size |
| self.conversation_formatter_class = conversation_formatter_class |
| self.llm_attn_implementation = llm_attn_implementation |
| self.disable_tie_weight = disable_tie_weight |
|
|
|
|
| |
| |
| |
| class ConversationFormatter(ABC): |
| support_tokenizer_types = None |
|
|
| def __init__(self, tokenizer): |
| tokenizer_type = type(tokenizer).__name__ |
| assert tokenizer_type in self.support_tokenizer_types, \ |
| f'Invalid tokenizer type, expected one from `{self.support_tokenizer_types}`, but got `{tokenizer_type}`' |
| self.tokenizer = tokenizer |
| self.image_token = IMAGE_TOKEN |
| self.image_token_id = IMAGE_TOKEN_ID |
| self.ignore_id = IGNORE_ID |
|
|
| def _tokenize_with_image_symbol(self, text): |
| text_chunks = [self.tokenizer(chunk, add_special_tokens=False).input_ids for chunk in |
| text.split(self.image_token)] |
| token_ids = [] |
| num_chuck = len(text_chunks) |
| for i, chunk in enumerate(text_chunks): |
| token_ids.extend(chunk) |
| if i < num_chuck - 1: |
| token_ids.append(self.image_token_id) |
| return token_ids |
|
|
| @abstractmethod |
| def format(self, conversations: List[Dict], generation_preface=None): |
| pass |
|
|
| @abstractmethod |
| def format_query(self, query, generation_preface=""): |
| pass |
|
|
|
|
| class QwenConversationFormatter(ConversationFormatter): |
| support_tokenizer_types = ['QWenTokenizer', 'Qwen2TokenizerFast'] |
|
|
| def __init__(self, tokenizer): |
| super().__init__(tokenizer) |
| self.from2role = { |
| "system": "<|im_start|>system\n", |
| "human": "<|im_start|>user\n", |
| "gpt": "<|im_start|>assistant\n", |
| } |
| self.gpt_token_num = None |
| self.im_end = "<|im_end|>\n" |
| self.default_system_prompt = "You are a helpful assistant." |
|
|
| def format(self, conversations: List[Dict], generation_preface=None): |
| if self.gpt_token_num is None: |
| self.gpt_token_num = len(self.tokenizer(self.from2role["gpt"], add_special_tokens=False).input_ids) |
|
|
| if conversations[0]["from"] != "system": |
| conversations.insert(0, { |
| "from": "system", |
| "value": self.default_system_prompt |
| }) |
|
|
| if generation_preface is not None: |
| conversations.append({ |
| "from": "gpt", |
| "value": generation_preface |
| }) |
|
|
| prompt = "" |
| input_ids = [] |
| labels = [] |
| num_conversation = len(conversations) |
| for i, conversation in enumerate(conversations): |
| frm = conversation["from"] |
| role = self.from2role[frm] |
| message = conversation["value"] |
| text = role + message |
| if i < num_conversation - 1 or generation_preface is None: |
| text += self.im_end |
| prompt += text |
| token_ids = self._tokenize_with_image_symbol(text) |
| input_ids.extend(token_ids) |
| label_ids = [self.ignore_id] * len(token_ids) |
| if frm == "gpt" and generation_preface is None: |
| |
| label_ids[self.gpt_token_num:-1] = token_ids[self.gpt_token_num:-1] |
| labels.extend(label_ids) |
|
|
| assert self._tokenize_with_image_symbol(prompt) == input_ids |
| assert len(input_ids) == len(labels) |
|
|
| return prompt, input_ids, labels |
|
|
| def format_query(self, query, generation_preface=""): |
| prompt, input_ids, _ = self.format([{ |
| "from": "human", |
| "value": query |
| }], generation_preface=generation_preface) |
|
|
| return prompt, input_ids |
|
|