Valley2.5 / processing_valley.py
Hyggge's picture
feat: modify file type of *.py, *.txt, etc. to change storage method
64c250f
import re
import types
import io
import torch
import os
from PIL import Image
import argparse
from qwen_vl_utils import fetch_image
from transformers import (
ProcessorMixin,
SiglipImageProcessor,
BatchFeature,
Qwen2VLImageProcessor,
PreTrainedTokenizer,
AutoImageProcessor,
CLIPImageProcessor,
)
from .utils import (
process_anyres_image,
preprocess_image_ovis,
ovis_template_process,
BLACK_IMG_ENV,
DEFAULT_IM_END_TOKEN,
DEFAULT_IM_START_TOKEN,
DEFAULT_IMAGE_TOKEN,
DEFAULT_VI_END_TOKEN,
DEFAULT_VI_START_TOKEN,
DEFAULT_VIDEO_TOKEN,
IMAGE_TOKEN_INDEX,
SEQ_MAX_LEN,
IGNORE_INDEX,
)
siglip_processor_config = {
"do_normalize": True,
"do_rescale": True,
"do_resize": True,
"image_mean": [
0.5,
0.5,
0.5
],
"image_processor_type": "SiglipImageProcessor",
"image_std": [
0.5,
0.5,
0.5
],
"processor_class": "SiglipProcessor",
"resample": 3,
"rescale_factor": 0.00392156862745098,
"size": {
"height": 384,
"width": 384
}
}
qwen2vl_processor_config = {
"min_pixels": 3136,
"max_pixels": 12845056,
"patch_size": 14,
"temporal_patch_size": 2,
"merge_size": 2,
"image_mean": [
0.48145466,
0.4578275,
0.40821073
],
"image_std": [
0.26862954,
0.26130258,
0.27577711
],
"image_processor_type": "Qwen2VLImageProcessor",
"processor_class": "Qwen2VLProcessor"
}
aimv2_processor_config = {
"crop_size": {
"height": 448,
"width": 448
},
"do_center_crop": True,
"do_convert_rgb": True,
"do_normalize": True,
"do_rescale": True,
"do_resize": True,
"image_mean": [
0.48145466,
0.4578275,
0.40821073
],
"image_processor_type": "CLIPImageProcessor",
"image_std": [
0.26862954,
0.26130258,
0.27577711
],
"resample": 3,
"rescale_factor": 0.00392156862745098,
"size": {
"shortest_edge": 448
}
}
class ValleyProcessor(ProcessorMixin):
attributes = ["tokenizer"]
optional_attributes = [
"max_pixels",
"min_pixels",
"anyres",
"only_crop_single_image",
"grid_pinpoints",
"use_special_start_end_token",
"only_navit",
"chat_template",
"process_mode",
]
tokenizer_class = "AutoTokenizer"
def __init__(self, tokenizer=None, chat_template=None, **kwargs):
super().__init__(tokenizer=tokenizer, chat_template=chat_template, **kwargs)
self.black_img = BLACK_IMG_ENV
self.siglip_image_processor = SiglipImageProcessor.from_dict(siglip_processor_config)
self.qwen2vl_image_processor = Qwen2VLImageProcessor.from_dict(qwen2vl_processor_config)
self.aimv2_image_processor = CLIPImageProcessor.from_dict(aimv2_processor_config)
self.anyres = kwargs.get("anyres", True)
self.grid_pinpoints = kwargs.get("grid_pinpoints", "(1x1),...,(3x3)")
self.only_crop_single_image = kwargs.get("only_crop_single_image", True)
self.use_special_start_end_token = kwargs.get("use_special_start_end_token", True)
self.only_navit = kwargs.get("only_navit", False)
self.process_mode = kwargs.get("process_mode", "qwen3")
self.aimv2_crop_size = self.aimv2_image_processor.size["shortest_edge"]
def preprocess_images_siglip(self, images) -> torch.FloatTensor:
if isinstance(images[0], str):
images_pil = [Image.open(img).convert("RGB") for img in images]
elif isinstance(images[0], Image.Image):
images_pil = [img.convert("RGB") for img in images]
elif isinstance(images[0], bytes):
images_pil = [Image.open(io.BytesIO(img)).convert("RGB") for img in images]
else:
raise ValueError("unsupported type")
processed_images = []
have_multi_images = len(images_pil) > 1
for img in images_pil:
if self.anyres:
if not self.only_crop_single_image or not have_multi_images:
image = process_anyres_image(img, self.siglip_image_processor, self.grid_pinpoints)
else:
image = [self.siglip_image_processor(img, return_tensors="pt")["pixel_values"][0]]
else:
image = self.siglip_image_processor(img, return_tensors="pt")["pixel_values"][0]
processed_images.append(image)
if not self.anyres:
return torch.stack(processed_images, dim=0)
else:
return [torch.stack(img, dim=0) for img in processed_images]
def preprocess_images_qwen2vl(self, images) -> dict:
if isinstance(images[0], str):
images_pil = [Image.open(img).convert("RGB") for img in images]
elif isinstance(images[0], Image.Image):
images_pil = [img.convert("RGB") for img in images]
elif isinstance(images[0], bytes):
images_pil = [Image.open(io.BytesIO(img)).convert("RGB") for img in images]
else:
raise ValueError("unsupported type")
image_sizes = [[x.size for x in images_pil]]
data_dict_qwen2vl = self.qwen2vl_image_processor(
[fetch_image({"image": img}) for img in images_pil],
return_tensors="pt"
)
data_dict_qwen2vl["image_sizes"] = image_sizes
return data_dict_qwen2vl
def preprocess_multimodal(self, conversations):
for sentence in conversations:
if sentence["role"] == "system":
continue
segs = re.split(DEFAULT_IMAGE_TOKEN, sentence["content"])
if self.use_special_start_end_token:
sentence["content"] = (DEFAULT_IM_START_TOKEN + DEFAULT_IMAGE_TOKEN + DEFAULT_IM_END_TOKEN).join(segs)
else:
sentence["content"] = DEFAULT_IMAGE_TOKEN.join(segs)
return conversations
def preprocess_images_aimv2(self, images) -> torch.FloatTensor:
processed_images = []
image_sizes_list = []
have_multi_images = len(images) > 1
for image_file in images:
if isinstance(image_file, str):
img = Image.open(image_file).convert("RGB")
elif isinstance(image_file, Image.Image):
img = image_file.convert("RGB")
elif isinstance(image_file, bytes):
img = Image.open(io.BytesIO(image_file)).convert("RGB")
else:
raise ValueError("unsupported type")
image_sizes_list.append(img.size)
if self.anyres:
if not self.only_crop_single_image or not have_multi_images:
img, ovis_image_placeholders = preprocess_image_ovis(img, image_processor=self.aimv2_image_processor, crop_size=self.aimv2_crop_size, max_partition=9)
else:
img, ovis_image_placeholders = preprocess_image_ovis(img, image_processor=self.aimv2_image_processor, crop_size=self.aimv2_crop_size, max_partition=1)
else:
img, ovis_image_placeholders = preprocess_image_ovis(img, image_processor=self.aimv2_image_processor, crop_size=self.aimv2_crop_size, max_partition=1)
img = (img, ovis_image_placeholders)
processed_images.append(img)
if not self.anyres:
return [(img[0], img[1]) for img in processed_images], [image_sizes_list]
else:
return [(torch.cat(img[0], dim=0), img[1]) for img in processed_images], [image_sizes_list]
def preprocess_qwen2(
self,
conversations,
tokenizer: PreTrainedTokenizer,
has_image: bool = False,
inference: bool = False,
only_mask_system: bool = False,
) -> dict:
conv = types.SimpleNamespace(
system="You are a helpful assistant.",
roles=("user", "assistant"),
version="qwen2",
offset=0,
sep="<|im_start|>",
sep2="<|im_end|>\n",
)
# Check system prompt
assert conversations[0]["role"] == "system"
if conversations[0]["content"] == None:
conversations[0]["content"] = conv.system # use default system prompt
# Check conversation sequence
for j, sentence in enumerate(conversations[1:]):
role = sentence["role"]
assert role == conv.roles[j % 2], "The conversation sequence is incorrect."
conversation_str = tokenizer.apply_chat_template(conversations, tokenize=False, add_generation_prompt=inference)
# Mask targets
rounds = conversation_str.split(conv.sep2)
input_ids_ = torch.tensor([], dtype=torch.int64)
targets_ = torch.tensor([], dtype=torch.int64)
for i, rou in enumerate(rounds):
if rou == "":
continue
if (not inference) or (i < (len(rounds) - 1)):
rou += conv.sep2
if has_image:
cur_input_ids_ = self.tokenizer_image_token(rou, tokenizer, return_tensors='pt')
input_ids_ = torch.cat([input_ids_, cur_input_ids_], dim=0)
if only_mask_system:
mask_len = len(self.tokenizer_image_token(re.sub(rf'{conv.roles[0]}\n[\s\S]*', f'{conv.roles[0]}:', rou),
tokenizer))
else:
mask_len = len(self.tokenizer_image_token(re.sub(rf'{conv.roles[1]}\n[\s\S]*', f'{conv.roles[1]}:', rou),
tokenizer))
targets_ = torch.cat([targets_, torch.tensor([-100] * mask_len), cur_input_ids_[mask_len:]], dim=0)
else:
cur_input_ids_ = tokenizer(rou, return_tensors='pt')["input_ids"][0, :]
input_ids_ = torch.cat([input_ids_, cur_input_ids_], dim=0)
mask_len = len(tokenizer(re.sub(rf'{conv.roles[1]}\n[\s\S]*', rf'{conv.roles[1]}:', rou))["input_ids"][:])
targets_ = torch.cat([targets_, torch.tensor([-100] * mask_len), cur_input_ids_[mask_len:]], dim=0)
return {"input_ids": input_ids_, "labels": targets_}
def preprocess_qwen3(
self,
conversations,
tokenizer: PreTrainedTokenizer,
has_image: bool = False,
inference: bool = False,
only_mask_system: bool = False,
enable_thinking: bool = False, #ZYF Modify to support enable_thinking
) -> dict:
conv = types.SimpleNamespace(
system="You are a helpful assistant.",
roles=("user", "assistant"),
version="qwen3",
offset=0,
sep="<|im_start|>",
sep2="<|im_end|>\n",
)
#print(conversations)
# Check system prompt
assert conversations[0]["role"] == "system"
if conversations[0]["content"] == None:
conversations[0]["content"] = conv.system # use default system prompt
# if conversations[0]['role'] == "system":
# conversations = conversations[1:]
# Check conversation sequence
# print(conversations)
for j, sentence in enumerate(conversations[1:]):
role = sentence["role"]
assert role == conv.roles[j % 2], "The conversation sequence is incorrect."
conversation_str = tokenizer.apply_chat_template(conversations, tokenize=False, add_generation_prompt=inference, enable_thinking=enable_thinking) #ZYF Modify to support thinking
# Mask targets
rounds = conversation_str.split(conv.sep2)
input_ids_ = torch.tensor([], dtype=torch.int64)
targets_ = torch.tensor([], dtype=torch.int64)
for i, rou in enumerate(rounds):
if rou == "":
continue
if (not inference) or (i < (len(rounds) - 1)):
rou += conv.sep2
if has_image:
cur_input_ids_ = self.tokenizer_image_token(rou, tokenizer, return_tensors='pt')
input_ids_ = torch.cat([input_ids_, cur_input_ids_], dim=0)
if only_mask_system:
mask_len = len(self.tokenizer_image_token(re.sub(rf'{conv.roles[0]}\n[\s\S]*', f'{conv.roles[0]}:', rou),
tokenizer))
else:
mask_len = len(self.tokenizer_image_token(re.sub(rf'{conv.roles[1]}\n[\s\S]*', f'{conv.roles[1]}:', rou),
tokenizer))
targets_ = torch.cat([targets_, torch.tensor([-100] * mask_len), cur_input_ids_[mask_len:]], dim=0)
else:
cur_input_ids_ = tokenizer(rou, return_tensors='pt')["input_ids"][0, :]
input_ids_ = torch.cat([input_ids_, cur_input_ids_], dim=0)
mask_len = len(tokenizer(re.sub(rf'{conv.roles[1]}\n[\s\S]*', rf'{conv.roles[1]}:', rou))["input_ids"][:])
targets_ = torch.cat([targets_, torch.tensor([-100] * mask_len), cur_input_ids_[mask_len:]], dim=0)
return {"input_ids": input_ids_, "labels": targets_}
def preprocess_ovis2(
self,
source, # do not include system prompt
tokenizer: PreTrainedTokenizer,
has_image: bool = False,
inference: bool = False,
only_mask_system: bool = False,
video_len: int = 0,
):
# print(source)
judge_format = "from" in source[0].keys()
if judge_format:
if source[-1]["from"] == "gpt":
source = source[:-1]
roles = {"human": 'user', "gpt": 'assistant'}
input_ids = []
labels = []
messages = "<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n"
for message in source:
if message["from"] == "human":
user = message["value"]
if '<image>' not in user and '<video>' not in user:
messages += f"<|im_start|>{roles['human']}\n" + user + "<|im_end|>\n"
if '<image>' in user:
# import re
# image_count = user.count('<image>')
# user = re.sub(r'<image>', '', user).strip()
# user = '\n'.join([f'Image {i+1}: <image>' for i in range(image_count)]) + '\n' + user
messages += f"<|im_start|>{roles['human']}\n" + user + "<|im_end|>\n"
if '<video>' in user:
user = user.replace('<video>', '\n'.join(['<image>'] * video_len) + '\n')
messages += f"<|im_start|>{roles['human']}\n" + user + "<|im_end|>\n"
elif message["from"] == "gpt":
assistant = message["value"]
messages += f"<|im_start|>{roles['gpt']}\n" + assistant + "<|im_end|>\n"
if inference:
messages += f"<|im_start|>{roles['gpt']}\n"
else:
messages = messages[:-1] # remove the final '\n',keep <|im_end|> as the end
messages = messages.split('<image>')
messages = [tokenizer.encode(m) for m in messages]
for m in messages[:-1]:
input_ids += m
input_ids += [IMAGE_TOKEN_INDEX]
input_ids += messages[-1]
# mask last assistant
head_id = tokenizer.encode(f'<|im_start|>{roles["gpt"]}\n')
last_id = None
for i, id in enumerate(input_ids):
if input_ids[i:i+len(head_id)] == head_id:
last_id = i+len(head_id)
if i+len(head_id) > len(input_ids):
break
assert last_id != None
labels = len(input_ids) * [IGNORE_INDEX]
labels[last_id:] = input_ids[last_id:]
return {"input_ids": torch.tensor(input_ids), "labels": torch.tensor(labels)}
else:
if source[-1]["role"] == "assistant":
source = source[:-1]
input_ids = []
labels = []
messages = "<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n"
for message in source:
if message["role"] == "user":
user = message["value"]
if '<image>' not in user and '<video>' not in user:
messages += f"<|im_start|>user\n" + user + "<|im_end|>\n"
if '<image>' in user:
# import re
# image_count = user.count('<image>')
# user = re.sub(r'<image>', '', user).strip()
# user = '\n'.join([f'Image {i+1}: <image>' for i in range(image_count)]) + '\n' + user
messages += f"<|im_start|>user\n" + user + "<|im_end|>\n"
if '<video>' in user:
user = user.replace('<video>', '\n'.join(['<image>'] * video_len) + '\n')
messages += f"<|im_start|>user\n" + user + "<|im_end|>\n"
elif message["role"] == "assistant":
assistant = message["value"]
messages += f"<|im_start|>assistant\n" + assistant + "<|im_end|>\n"
if inference:
messages += f"<|im_start|>assistant\n"
else:
messages = messages[:-1] # remove the final '\n',keep <|im_end|> as the end
messages = messages.split('<image>')
messages = [tokenizer.encode(m) for m in messages]
for m in messages[:-1]:
input_ids += m
input_ids += [IMAGE_TOKEN_INDEX]
input_ids += messages[-1]
# mask last assistant
head_id = tokenizer.encode(f'<|im_start|>assistant\n')
last_id = None
for i, id in enumerate(input_ids):
if input_ids[i:i+len(head_id)] == head_id:
last_id = i+len(head_id)
if i+len(head_id) > len(input_ids):
break
assert last_id != None
labels = len(input_ids) * [IGNORE_INDEX]
labels[last_id:] = input_ids[last_id:]
return {"input_ids": torch.tensor(input_ids), "labels": torch.tensor(labels)}
def tokenizer_image_token(
self,
prompt,
tokenizer,
image_token_index=IMAGE_TOKEN_INDEX,
return_tensors=None,
):
def split_with_token(string, token):
result = string.split(token)
for i in range(len(result) - 1):
result.insert(i * 2 + 1, token)
return result
if len(prompt) > SEQ_MAX_LEN:
raise ValueError("sequence is too long !!!")
prompt_chunks = split_with_token(prompt, DEFAULT_IMAGE_TOKEN)
input_ids, offset = ([tokenizer.bos_token_id], 1) if getattr(tokenizer,'bos_token',None) else ([], 0)
token2index = {DEFAULT_IMAGE_TOKEN: image_token_index}
for chunk in prompt_chunks:
if chunk in token2index:
input_ids.append(token2index[chunk])
else:
chunk_ids = tokenizer(chunk).input_ids
if chunk_ids[0] != getattr(tokenizer,'bos_token_id', None):
offset = 0
input_ids.extend(chunk_ids[offset:])
if return_tensors is not None:
if return_tensors == "pt":
return torch.tensor(input_ids, dtype=torch.long)
raise ValueError(f"Unsupported tensor type: {return_tensors}")
return input_ids
def __call__(self, messages, inference=True, **kwargs) -> BatchFeature:
# print("+++++++++++"*5+"Process get"+"++++++++++"*5)
# print(messages)
# print("+++++++++++"*10)
process_mode = self.process_mode
if process_mode == "ovis2":
video_len = kwargs.get('video_len', 0)
# max_tile_num = kwargs.get('max_tile_num', 1)
if "images" not in messages or not messages["images"] or not messages["images"][0]:
images = [self.black_img]
elif type(messages["images"]) == str:
images = [messages["images"]]
else:
images = messages["images"]
conversations = messages["conversations"]
# adapt for user-assistant format, transform to human-gpt format
if "role" in conversations[0]:
new_conversations = []
for conversation in conversations:
if conversation["role"] == "system":
new_conversations.append({"from": "system", "value": conversation["content"]})
elif conversation["role"] == "user":
new_conversations.append({"from": "human", "value": conversation["content"]})
elif conversation["role"] == "assistant":
new_conversations.append({"from": "gpt", "value": conversation["content"]})
conversations = new_conversations
# add <image> token
first_conv = conversations[1] if conversations[0]["from"] == "system" else conversations[0]
if images and "<image>" not in first_conv["value"]:
image_token = "\n".join(["<image>"] * len(images))
first_conv["value"] = f"{image_token}\n{first_conv['value']}"
data_dict = self.preprocess_ovis2(conversations, self.tokenizer, has_image=True, only_mask_system=False, inference=inference, video_len=video_len)
data_dict['images'], data_dict['image_sizes'] = self.preprocess_images_aimv2(images)
data_dict = ovis_template_process(data_dict)
# be batch
data_dict['images'] = [data_dict['images']]
data_dict['input_ids'] = data_dict['input_ids'].unsqueeze(0)
return BatchFeature(data={**data_dict})
elif process_mode == "qwen2" or process_mode == "qwen3":
max_pixels=kwargs.get("max_pixels", self.max_pixels)
min_pixels=kwargs.get("min_pixels", self.min_pixels)
if max_pixels is not None:
self.qwen2vl_image_processor.max_pixels = max_pixels
if min_pixels is not None:
self.qwen2vl_image_processor.min_pixels = min_pixels
# Deal with images
if "images" not in messages or not messages["images"] or not messages["images"][0]:
images = [self.black_img]
elif type(messages["images"]) == str:
images = [messages["images"]]
else:
images = messages["images"]
# Deal with conversations
conversations = messages["conversations"]
if conversations[0]["role"] != "system":
conversations = [{"role":"system", "content": None}] + conversations # dummy system prompt
# Insert special token `<image>`
assert conversations[1]["role"] == "user"
if images and "<image>" not in conversations[1]["content"]:
image_token = " ".join(["<image>"] * len(images))
conversations[1]["content"] = f"{image_token}\n{conversations[1]['content']}"
# The last message should be assistant if inference=True
if inference:
assert conversations[-1]["role"] == "user", "the last message should be assistant if inference=True"
# Image preprocess
if self.only_navit:
precessed_images_siglip = None
else:
precessed_images_siglip = self.preprocess_images_siglip(images)
processed_data_dict_qwen2vl = self.preprocess_images_qwen2vl(images)
source = self.preprocess_multimodal(conversations)
if process_mode == "qwen2":
data_dict = self.preprocess_qwen2(source, self.tokenizer, has_image=True, only_mask_system=False, inference=inference)
if process_mode == "qwen3":
# ZYF Modify to support thinking
enable_thinking = kwargs.get("enable_thinking", True) #默认开启
data_dict = self.preprocess_qwen3(source, self.tokenizer, has_image=True, only_mask_system=False, inference=inference, enable_thinking=enable_thinking)
# Construct batch data
data_dict["input_ids"] = data_dict["input_ids"].unsqueeze(0) # batch_size = 1
data_dict["labels"] = data_dict["labels"].unsqueeze(0)
data_dict["images"] = [precessed_images_siglip]
return BatchFeature(data={**data_dict, **processed_data_dict_qwen2vl})
else:
raise ValueError(f"Unsupported process mode: {process_mode}")
def batch_decode(self, *args, **kwargs):
"""
This method forwards all its arguments to Qwen2TokenizerFast's [`~PreTrainedTokenizer.batch_decode`]. Please
refer to the docstring of this method for more information.
"""
return self.tokenizer.batch_decode(*args, **kwargs)
def decode(self, *args, **kwargs):
"""
This method forwards all its arguments to Qwen2TokenizerFast's [`~PreTrainedTokenizer.decode`]. Please refer to
the docstring of this method for more information.
"""
return self.tokenizer.decode(*args, **kwargs)