|
|
import os |
|
|
|
|
|
os.system('cd fairseq;' |
|
|
'pip install ./; cd ..') |
|
|
os.system('ls -l') |
|
|
|
|
|
import torch |
|
|
import numpy as np |
|
|
import re |
|
|
from utils.eval_utils import eval_step |
|
|
from fairseq import utils,tasks |
|
|
from fairseq import checkpoint_utils |
|
|
from fairseq import distributed_utils, options, tasks, utils |
|
|
from fairseq.dataclass.utils import convert_namespace_to_omegaconf |
|
|
from utils.zero_shot_utils import zero_shot_step |
|
|
from tasks.mm_tasks.vqa_gen import VqaGenTask |
|
|
from tasks.mm_tasks.refcoco import RefcocoTask |
|
|
from models.ofa import OFAModel |
|
|
from PIL import Image |
|
|
from torchvision import transforms |
|
|
import gradio as gr |
|
|
|
|
|
|
|
|
tasks.register_task('refcoco',RefcocoTask) |
|
|
|
|
|
use_cuda = torch.cuda.is_available() |
|
|
|
|
|
use_fp16 = False |
|
|
|
|
|
os.system('wget https://huggingface.co/komleva/VQAmodel/resolve/main/checkpoint.best_score_0.9840.pt; ' |
|
|
'mkdir -p checkpoints; mv checkpoint.best_score_0.9840.pt checkpoints/checkpoint.best_score_0.9840.pt') |
|
|
|
|
|
|
|
|
parser = options.get_generation_parser() |
|
|
input_args = ["","--beam=100", "--unnormalized", "--path=./checkpoints/checkpoint.best_score_0.9840.pt"] |
|
|
args = options.parse_args_and_arch(parser, input_args) |
|
|
cfg = convert_namespace_to_omegaconf(args) |
|
|
|
|
|
|
|
|
"""task = tasks.setup_task(cfg.task) |
|
|
models, cfg = checkpoint_utils.load_model_ensemble( |
|
|
utils.split_paths(cfg.common_eval.path), |
|
|
task=task |
|
|
)""" |
|
|
overrides={"bpe_dir":"utils/BPE"} |
|
|
models, cfg, task = checkpoint_utils.load_model_ensemble_and_task( |
|
|
utils.split_paths('./checkpoints/checkpoint.best_score_0.9840.pt'), |
|
|
arg_overrides=overrides |
|
|
) |
|
|
|
|
|
|
|
|
for model in models: |
|
|
model.eval() |
|
|
if use_fp16: |
|
|
model.half() |
|
|
if use_cuda and not cfg.distributed_training.pipeline_model_parallel: |
|
|
model.cuda() |
|
|
model.prepare_for_inference_(cfg) |
|
|
|
|
|
|
|
|
generator = task.build_generator(models, cfg.generation) |
|
|
|
|
|
|
|
|
from torchvision import transforms |
|
|
mean = [0.5, 0.5, 0.5] |
|
|
std = [0.5, 0.5, 0.5] |
|
|
|
|
|
patch_resize_transform = transforms.Compose([ |
|
|
lambda image: image.convert("RGB"), |
|
|
transforms.Resize((cfg.task.patch_image_size, cfg.task.patch_image_size), interpolation=Image.BICUBIC), |
|
|
transforms.ToTensor(), |
|
|
transforms.Normalize(mean=mean, std=std), |
|
|
]) |
|
|
|
|
|
|
|
|
bos_item = torch.LongTensor([task.src_dict.bos()]) |
|
|
eos_item = torch.LongTensor([task.src_dict.eos()]) |
|
|
pad_idx = task.src_dict.pad() |
|
|
|
|
|
|
|
|
def pre_question(question, max_ques_words): |
|
|
question = question.lower().lstrip(",.!?*#:;~").replace('-', ' ').replace('/', ' ') |
|
|
question = re.sub( |
|
|
r"\s{2,}", |
|
|
' ', |
|
|
question, |
|
|
) |
|
|
question = question.rstrip('\n') |
|
|
question = question.strip(' ') |
|
|
|
|
|
question_words = question.split(' ') |
|
|
if len(question_words) > max_ques_words: |
|
|
question = ' '.join(question_words[:max_ques_words]) |
|
|
return question |
|
|
|
|
|
def encode_text(text, length=None, append_bos=False, append_eos=False): |
|
|
s = task.tgt_dict.encode_line( |
|
|
line=task.bpe.encode(text), |
|
|
add_if_not_exist=False, |
|
|
append_eos=False |
|
|
).long() |
|
|
if length is not None: |
|
|
s = s[:length] |
|
|
if append_bos: |
|
|
s = torch.cat([bos_item, s]) |
|
|
if append_eos: |
|
|
s = torch.cat([s, eos_item]) |
|
|
return s |
|
|
|
|
|
patch_image_size = cfg.task.patch_image_size |
|
|
|
|
|
def construct_sample(image: Image, question: str): |
|
|
w, h = image.size |
|
|
w_resize_ratio = torch.tensor(patch_image_size / w).unsqueeze(0) |
|
|
h_resize_ratio = torch.tensor(patch_image_size / h).unsqueeze(0) |
|
|
patch_image = patch_resize_transform(image).unsqueeze(0) |
|
|
patch_mask = torch.tensor([True]) |
|
|
|
|
|
question = pre_question(question, task.cfg.max_src_length) |
|
|
question = question + '?' if not question.endswith('?') else question |
|
|
|
|
|
src_text = encode_text(' which region does the text " {} " describe?'.format(question), append_bos=True, append_eos=True).unsqueeze(0) |
|
|
|
|
|
src_length = torch.LongTensor([s.ne(pad_idx).long().sum() for s in src_text]) |
|
|
ref_dict = np.array([{'yes': 1.0}]) |
|
|
sample = { |
|
|
"id":np.array(['42']), |
|
|
"net_input": { |
|
|
"src_tokens": src_text, |
|
|
"src_lengths": src_length, |
|
|
"patch_images": patch_image, |
|
|
"patch_masks": patch_mask, |
|
|
}, |
|
|
"w_resize_ratios": w_resize_ratio, |
|
|
"h_resize_ratios": h_resize_ratio, |
|
|
"region_coords": torch.randn(1, 4) |
|
|
} |
|
|
return sample |
|
|
|
|
|
|
|
|
def apply_half(t): |
|
|
if t.dtype is torch.float32: |
|
|
return t.to(dtype=torch.half) |
|
|
return t |
|
|
|
|
|
|
|
|
|
|
|
def open_domain_vqa(Image, Question): |
|
|
sample = construct_sample(Image, Question) |
|
|
sample = utils.move_to_cuda(sample) if use_cuda else sample |
|
|
sample = utils.apply_to_sample(apply_half, sample) if use_fp16 else sample |
|
|
|
|
|
with torch.no_grad(): |
|
|
|
|
|
result, scores = eval_step(task, generator, models, sample) |
|
|
left = int(result[0]["box"][0]) |
|
|
right = int(result[0]["box"][2]) |
|
|
top = int(result[0]["box"][1]) |
|
|
bottom= int(result[0]["box"][3]) |
|
|
if left >right: |
|
|
t = left |
|
|
left = right |
|
|
rights = t |
|
|
if bottom <top: |
|
|
t = top |
|
|
top = bottom |
|
|
bottom = t |
|
|
if left == right: |
|
|
left = left -10 |
|
|
|
|
|
|
|
|
return Image.crop((left, top, right, bottom)) |
|
|
|
|
|
|
|
|
title = "Visual Question Answering" |
|
|
description = " Demo for Visual Question Answering. That model achieved 3rd place in the Toloka VQA Challenge" |
|
|
|
|
|
|
|
|
examples = [['demo_imgs/000000001093.jpg', 'Which is different from the group?'], ['demo_imgs/000000001964.jpg','What do we drive for personal use?'], ['demo_imgs/000000002473.jpg','What do we use to slide on snow?']] |
|
|
io = gr.Interface(fn=open_domain_vqa, inputs=[gr.inputs.Image(type='pil'), "textbox"], outputs=gr.outputs.Image(type='pil'), |
|
|
title=title, description=description, examples=examples, |
|
|
allow_flagging=False, allow_screenshot=False) |
|
|
io.launch() |