| import glob |
| import json |
| import os |
| import random |
|
|
| import cv2 |
| import numpy as np |
| import torch |
| import torch.nn.functional as F |
| from transformers import CLIPImageProcessor |
|
|
| from model.llava import conversation as conversation_lib |
| from model.segment_anything.utils.transforms import ResizeLongestSide |
|
|
| from .data_processing import get_mask_from_json |
| from .utils import (ANSWER_LIST, DEFAULT_IMAGE_TOKEN, |
| EXPLANATORY_QUESTION_LIST, LONG_QUESTION_LIST, |
| SHORT_QUESTION_LIST) |
|
|
|
|
| class ReasonSegDataset(torch.utils.data.Dataset): |
| pixel_mean = torch.Tensor([123.675, 116.28, 103.53]).view(-1, 1, 1) |
| pixel_std = torch.Tensor([58.395, 57.12, 57.375]).view(-1, 1, 1) |
| img_size = 1024 |
| ignore_label = 255 |
|
|
| def __init__( |
| self, |
| base_image_dir, |
| tokenizer, |
| vision_tower, |
| samples_per_epoch=500 * 8 * 2 * 10, |
| precision: str = "fp32", |
| image_size: int = 224, |
| num_classes_per_sample: int = 3, |
| exclude_val=False, |
| reason_seg_data="ReasonSeg|train", |
| explanatory=0.1, |
| ): |
| self.exclude_val = exclude_val |
| self.reason_seg_data = reason_seg_data |
| self.samples_per_epoch = samples_per_epoch |
| self.explanatory = explanatory |
| self.num_classes_per_sample = num_classes_per_sample |
|
|
| self.base_image_dir = base_image_dir |
| self.image_size = image_size |
| self.tokenizer = tokenizer |
| self.precision = precision |
| self.transform = ResizeLongestSide(image_size) |
| self.clip_image_processor = CLIPImageProcessor.from_pretrained(vision_tower) |
|
|
| self.short_question_list = SHORT_QUESTION_LIST |
| self.long_question_list = LONG_QUESTION_LIST |
| self.answer_list = ANSWER_LIST |
|
|
| reason_seg_data, splits = reason_seg_data.split("|") |
| splits = splits.split("_") |
| images = [] |
| for split in splits: |
| images_split = glob.glob( |
| os.path.join( |
| base_image_dir, "reason_seg", reason_seg_data, split, "*.jpg" |
| ) |
| ) |
| images.extend(images_split) |
| jsons = [path.replace(".jpg", ".json") for path in images] |
| self.reason_seg_data = (images, jsons) |
|
|
| print("number of reason_seg samples: ", len(images)) |
|
|
| if explanatory != -1: |
| self.explanatory_question_list = EXPLANATORY_QUESTION_LIST |
| self.img_to_explanation = {} |
| with open( |
| os.path.join( |
| base_image_dir, |
| "reason_seg", |
| reason_seg_data, |
| "explanatory", |
| "train.json", |
| ) |
| ) as f: |
| items = json.load(f) |
| for item in items: |
| img_name = item["image"] |
| self.img_to_explanation[img_name] = { |
| "query": item["query"], |
| "outputs": item["outputs"], |
| } |
|
|
| print("len(self.img_to_explanation): ", len(self.img_to_explanation)) |
|
|
| def __len__(self): |
| return self.samples_per_epoch |
|
|
| def preprocess(self, x: torch.Tensor) -> torch.Tensor: |
| """Normalize pixel values and pad to a square input.""" |
| |
| x = (x - self.pixel_mean) / self.pixel_std |
|
|
| |
| h, w = x.shape[-2:] |
| padh = self.img_size - h |
| padw = self.img_size - w |
| x = F.pad(x, (0, padw, 0, padh)) |
| return x |
|
|
| def __getitem__(self, idx): |
| images, jsons = self.reason_seg_data |
| idx = random.randint(0, len(images) - 1) |
| image_path = images[idx] |
| json_path = jsons[idx] |
|
|
| image = cv2.imread(image_path) |
| image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
| ori_size = image.shape[:2] |
| |
| image_clip = self.clip_image_processor.preprocess(image, return_tensors="pt")[ |
| "pixel_values" |
| ][0] |
|
|
| mask, sents, is_sentence = get_mask_from_json(json_path, image) |
| if len(sents) >= self.num_classes_per_sample: |
| sampled_inds = np.random.choice( |
| list(range(len(sents))), size=self.num_classes_per_sample, replace=False |
| ) |
| else: |
| sampled_inds = list(range(len(sents))) |
| sampled_sents = np.vectorize(sents.__getitem__)(sampled_inds).tolist() |
| sampled_masks = [ |
| (mask == 1).astype(np.float32) for _ in range(len(sampled_inds)) |
| ] |
|
|
| image = self.transform.apply_image(image) |
| resize = image.shape[:2] |
|
|
| image_name = image_path.split("/")[-1] |
| if self.explanatory != -1 and image_name in self.img_to_explanation: |
| if random.random() < self.explanatory: |
| choice = 2 |
| else: |
| choice = random.randint(0, 1) |
|
|
| questions = [] |
| answers = [] |
| for text in sampled_sents: |
| if is_sentence: |
| question_template = random.choice(self.long_question_list) |
| questions.append(question_template.format(sent=text)) |
| else: |
| question_template = random.choice(self.short_question_list) |
| questions.append(question_template.format(class_name=text.lower())) |
|
|
| |
| img_name = image_path.split("/")[-1] |
| if self.explanatory != -1 and img_name in self.img_to_explanation: |
| if choice == 0: |
| answers.append(random.choice(self.answer_list)) |
| elif choice == 1: |
| image_name = image_path.split("/")[-1] |
| answer = self.img_to_explanation[image_name]["outputs"] |
| answer = random.choice(self.answer_list) + " {}".format(answer) |
| questions[-1] = ( |
| DEFAULT_IMAGE_TOKEN |
| + "\n" |
| + text |
| + " {}".format(random.choice(self.explanatory_question_list)) |
| ) |
| answers.append(answer) |
| elif choice == 2: |
| image_name = image_path.split("/")[-1] |
| answer = self.img_to_explanation[image_name]["outputs"] |
| questions[-1] = DEFAULT_IMAGE_TOKEN + "\n" + text |
| answers.append(answer) |
| else: |
| raise ValueError("Not implemented yet.") |
| else: |
| answers.append(random.choice(self.answer_list)) |
|
|
| conversations = [] |
| conv = conversation_lib.default_conversation.copy() |
| roles = {"human": conv.roles[0], "gpt": conv.roles[1]} |
|
|
| i = 0 |
| while i < len(questions): |
| conv.messages = [] |
| conv.append_message(conv.roles[0], questions[i]) |
| conv.append_message(conv.roles[1], answers[i]) |
| conversations.append(conv.get_prompt()) |
| i += 1 |
|
|
| image = self.preprocess(torch.from_numpy(image).permute(2, 0, 1).contiguous()) |
|
|
| image_name = image_path.split("/")[-1] |
| if ( |
| self.explanatory != -1 |
| and image_name in self.img_to_explanation |
| and choice == 2 |
| ): |
| masks = torch.rand(0, *ori_size) |
| label = torch.ones(ori_size) * self.ignore_label |
| else: |
| masks = np.stack(sampled_masks, axis=0) |
| masks = torch.from_numpy(masks) |
| label = torch.ones(masks.shape[1], masks.shape[2]) * self.ignore_label |
|
|
| return ( |
| image_path, |
| image, |
| image_clip, |
| conversations, |
| masks, |
| label, |
| resize, |
| questions, |
| sampled_sents, |
| ) |
|
|