| | import os |
| |
|
| | |
| | import torch |
| |
|
| | |
| | import json |
| | from PIL import Image |
| | |
| |
|
| | |
| | import base64 |
| | import io |
| |
|
| | |
| | from accelerate import load_checkpoint_and_dispatch, init_empty_weights |
| |
|
| | |
| |
|
| | |
| | from transformers import AutoTokenizer, AutoModel |
| |
|
| | |
| |
|
| | from omnilmm.utils import disable_torch_init |
| | from omnilmm.model.omnilmm import OmniLMMForCausalLM |
| | from omnilmm.model.utils import build_transform |
| |
|
| | |
| | |
| | |
| | from omnilmm.train.train_utils import omni_preprocess |
| | |
| |
|
| | DEFAULT_IMAGE_TOKEN = "<image>" |
| | DEFAULT_IMAGE_PATCH_TOKEN = "<im_patch>" |
| | DEFAULT_IM_START_TOKEN = "<im_start>" |
| | DEFAULT_IM_END_TOKEN = "<im_end>" |
| |
|
| | |
| |
|
| | def init_omni_lmm(model_path): |
| | torch.backends.cuda.matmul.allow_tf32 = True |
| | disable_torch_init() |
| | model_name = os.path.expanduser(model_path) |
| | print(f'Load omni_lmm model and tokenizer from {model_name}') |
| | tokenizer = AutoTokenizer.from_pretrained( |
| | model_name, model_max_length=2048) |
| |
|
| | if False: |
| | |
| | with init_empty_weights(): |
| | model = OmniLMMForCausalLM.from_pretrained(model_name, tune_clip=True, torch_dtype=torch.bfloat16) |
| | model = load_checkpoint_and_dispatch(model, model_name, dtype=torch.bfloat16, |
| | device_map="auto", no_split_module_classes=['Eva','MistralDecoderLayer', 'ModuleList', 'Resampler'] |
| | ) |
| | else: |
| | model = OmniLMMForCausalLM.from_pretrained( |
| | model_name, tune_clip=True, torch_dtype=torch.bfloat16 |
| | ).to(device='cuda', dtype=torch.bfloat16) |
| |
|
| | image_processor = build_transform( |
| | is_train=False, input_size=model.model.config.image_size, std_mode='OPENAI_CLIP') |
| |
|
| | mm_use_im_start_end = getattr(model.config, "mm_use_im_start_end", False) |
| | assert mm_use_im_start_end |
| |
|
| | tokenizer.add_tokens([DEFAULT_IMAGE_PATCH_TOKEN, DEFAULT_IM_START_TOKEN, |
| | DEFAULT_IM_END_TOKEN], special_tokens=True) |
| |
|
| |
|
| | vision_config = model.model.vision_config |
| | vision_config.im_patch_token = tokenizer.convert_tokens_to_ids( |
| | [DEFAULT_IMAGE_PATCH_TOKEN])[0] |
| | vision_config.use_im_start_end = mm_use_im_start_end |
| | vision_config.im_start_token, vision_config.im_end_token = tokenizer.convert_tokens_to_ids( |
| | [DEFAULT_IM_START_TOKEN, DEFAULT_IM_END_TOKEN]) |
| | image_token_len = model.model.config.num_query |
| |
|
| | return model, image_processor, image_token_len, tokenizer |
| |
|
| | def expand_question_into_multimodal(question_text, image_token_len, im_st_token, im_ed_token, im_patch_token): |
| | if '<image>' in question_text[0]['content']: |
| | question_text[0]['content'] = question_text[0]['content'].replace( |
| | '<image>', im_st_token + im_patch_token * image_token_len + im_ed_token) |
| | else: |
| | question_text[0]['content'] = im_st_token + im_patch_token * \ |
| | image_token_len + im_ed_token + '\n' + question_text[0]['content'] |
| | return question_text |
| |
|
| | def wrap_question_for_omni_lmm(question, image_token_len, tokenizer): |
| | question = expand_question_into_multimodal( |
| | question, image_token_len, DEFAULT_IM_START_TOKEN, DEFAULT_IM_END_TOKEN, DEFAULT_IMAGE_PATCH_TOKEN) |
| |
|
| | conversation = question |
| | data_dict = omni_preprocess(sources=[conversation], |
| | tokenizer=tokenizer, |
| | generation=True) |
| |
|
| | data_dict = dict(input_ids=data_dict["input_ids"][0], |
| | labels=data_dict["labels"][0]) |
| | return data_dict |
| |
|
| |
|
| |
|
| | class OmniLMM12B: |
| | def __init__(self, model_path) -> None: |
| | model, img_processor, image_token_len, tokenizer = init_omni_lmm(model_path) |
| | self.model = model |
| | self.image_token_len = image_token_len |
| | self.image_transform = img_processor |
| | self.tokenizer = tokenizer |
| | self.model.eval() |
| |
|
| | def decode(self, image, input_ids): |
| | with torch.inference_mode(): |
| | output = self.model.generate_vllm( |
| | input_ids=input_ids.unsqueeze(0).cuda(), |
| | images=image.unsqueeze(0).half().cuda(), |
| | temperature=0.6, |
| | max_new_tokens=1024, |
| | |
| | do_sample=True, |
| | output_scores=True, |
| | return_dict_in_generate=True, |
| | repetition_penalty=1.1, |
| | top_k=30, |
| | top_p=0.9, |
| | ) |
| |
|
| | response = self.tokenizer.decode( |
| | output.sequences[0], skip_special_tokens=True) |
| | response = response.strip() |
| | return response |
| |
|
| | def chat(self, input): |
| | try: |
| | image = Image.open(io.BytesIO(base64.b64decode(input['image']))).convert('RGB') |
| | except Exception as e: |
| | return "Image decode error" |
| |
|
| | msgs = json.loads(input['question']) |
| | input_ids = wrap_question_for_omni_lmm( |
| | msgs, self.image_token_len, self.tokenizer)['input_ids'] |
| | input_ids = torch.as_tensor(input_ids) |
| | |
| | image = self.image_transform(image) |
| |
|
| | out = self.decode(image, input_ids) |
| |
|
| | return out |
| | |
| |
|
| | def img2base64(file_name): |
| | with open(file_name, 'rb') as f: |
| | encoded_string = base64.b64encode(f.read()) |
| | return encoded_string |
| |
|
| |
|
| | |
| | |
| | |
| | |
| |
|
| |
|
| | |
| | |
| | |
| | |
| |
|
| |
|
| | |
| | |
| | |
| | |
| |
|
| | class MiniCPMV: |
| | def __init__(self, model_path) -> None: |
| | self.model = AutoModel.from_pretrained(model_path, trust_remote_code=True).to(dtype=torch.bfloat16) |
| | self.tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) |
| | self.model.eval().cuda() |
| |
|
| | def chat(self, input): |
| | try: |
| | image = Image.open(io.BytesIO(base64.b64decode(input['image']))).convert('RGB') |
| | except Exception as e: |
| | return "Image decode error" |
| |
|
| | msgs = json.loads(input['question']) |
| | |
| | answer, context, _ = self.model.chat( |
| | image=image, |
| | msgs=msgs, |
| | context=None, |
| | tokenizer=self.tokenizer, |
| | sampling=True, |
| | temperature=0.7 |
| | ) |
| | return answer |
| |
|
| | class MiniCPMV2_5: |
| | def __init__(self, model_path) -> None: |
| | self.model = AutoModel.from_pretrained(model_path, trust_remote_code=True).to(dtype=torch.float16) |
| | self.tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) |
| | self.model.eval().cuda() |
| |
|
| | def chat(self, input): |
| | try: |
| | image = Image.open(io.BytesIO(base64.b64decode(input['image']))).convert('RGB') |
| | except Exception as e: |
| | return "Image decode error" |
| |
|
| | msgs = json.loads(input['question']) |
| | |
| | answer = self.model.chat( |
| | image=image, |
| | msgs=msgs, |
| | tokenizer=self.tokenizer, |
| | sampling=True, |
| | temperature=0.7 |
| | ) |
| | return answer |
| |
|
| | class MiniCPMV2_6: |
| | def __init__(self, model_path, multi_gpus=False) -> None: |
| |
|
| | print('torch_version:', torch.__version__) |
| | if multi_gpus: |
| | from accelerate import load_checkpoint_and_dispatch, init_empty_weights, infer_auto_device_map |
| | with init_empty_weights(): |
| | model = AutoModel.from_pretrained(model_path, trust_remote_code=True, |
| | attn_implementation='sdpa', torch_dtype=torch.bfloat16) |
| |
|
| | device_map = infer_auto_device_map(model, max_memory={0: "10GB", 1: "10GB"}, |
| | no_split_module_classes=['SiglipVisionTransformer', 'Qwen2DecoderLayer']) |
| | device_id = device_map["llm.model.embed_tokens"] |
| | device_map["llm.lm_head"] = device_id |
| | device_map["vpm"] = device_id |
| | device_map["resampler"] = device_id |
| | device_id2 = device_map["llm.model.layers.26"] |
| | device_map["llm.model.layers.8"] = device_id2 |
| | device_map["llm.model.layers.9"] = device_id2 |
| | device_map["llm.model.layers.10"] = device_id2 |
| | device_map["llm.model.layers.11"] = device_id2 |
| | device_map["llm.model.layers.12"] = device_id2 |
| | device_map["llm.model.layers.13"] = device_id2 |
| | device_map["llm.model.layers.14"] = device_id2 |
| | device_map["llm.model.layers.15"] = device_id2 |
| | device_map["llm.model.layers.16"] = device_id2 |
| | print(device_map) |
| |
|
| | self.model = load_checkpoint_and_dispatch(model, model_path, dtype=torch.bfloat16, device_map=device_map) |
| | self.model.eval() |
| | else: |
| | self.model = AutoModel.from_pretrained(model_path, trust_remote_code=True, |
| | attn_implementation='sdpa', torch_dtype=torch.bfloat16) |
| | self.model.eval().cuda() |
| |
|
| | self.tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) |
| |
|
| | def chat(self, input): |
| | image = None |
| | if "image" in input and len(input["image"]) > 10: |
| | try: |
| | image = Image.open(io.BytesIO(base64.b64decode(input['image']))).convert('RGB') |
| | except Exception as e: |
| | return "Image decode error" |
| |
|
| | msgs = json.loads(input["question"]) |
| |
|
| | for msg in msgs: |
| | contents = msg.pop('content') |
| | if isinstance(contents, str): |
| | contents = [contents] |
| | |
| | new_cnts = [] |
| | for c in contents: |
| | if isinstance(c, dict): |
| | if c['type'] == 'text': |
| | c = c['pairs'] |
| | elif c['type'] == 'image': |
| | c = Image.open(io.BytesIO(base64.b64decode(c["pairs"]))).convert('RGB') |
| | else: |
| | raise ValueError("content type only support text and image.") |
| | new_cnts.append(c) |
| | msg['content'] = new_cnts |
| | print(f'msgs: {str(msgs)}') |
| |
|
| | answer = self.model.chat( |
| | image=image, |
| | msgs=msgs, |
| | tokenizer=self.tokenizer, |
| | ) |
| | return answer |
| |
|
| |
|
| | class MiniCPMVChat: |
| | def __init__(self, model_path, multi_gpus=False) -> None: |
| | if '12B' in model_path: |
| | self.model = OmniLMM12B(model_path) |
| | elif 'MiniCPM-Llama3-V' in model_path: |
| | self.model = MiniCPMV2_5(model_path) |
| | elif 'MiniCPM-V-2_6' in model_path: |
| | self.model = MiniCPMV2_6(model_path, multi_gpus) |
| | else: |
| | self.model = MiniCPMV(model_path) |
| |
|
| | def chat(self, input): |
| | return self.model.chat(input) |
| |
|
| |
|
| | if __name__ == '__main__': |
| | |
| | model_path = 'openbmb/OmniLMM-12B' |
| | chat_model = MiniCPMVChat(model_path) |
| |
|
| | im_64 = img2base64('./assets/worldmap_ck.jpg') |
| |
|
| | |
| | msgs = [{"role": "user", "content": "What is interesting about this image?"}] |
| | input = {"image": im_64, "question": json.dumps(msgs, ensure_ascii=True)} |
| | answer = chat_model.chat(input) |
| | print(msgs[-1]["content"]+'\n', answer) |
| |
|
| | |
| | msgs.append({"role": "assistant", "content": answer}) |
| | msgs.append({"role": "user", "content": "Where is China in the image"}) |
| | input = {"image": im_64,"question": json.dumps(msgs, ensure_ascii=True)} |
| | answer = chat_model.chat(input) |
| | print(msgs[-1]["content"]+'\n', answer) |
| |
|
| |
|
| |
|
| | |
| |
|
| |
|
| | |
| |
|
| |
|
| | |
| |
|
| |
|
| | |
| |
|
| |
|
| | |
| |
|
| |
|
| | |
| |
|
| |
|
| | |
| |
|
| |
|
| | |
| |
|