import os os.system('cd fairseq;' 'pip install ./; cd ..') os.system('ls -l') import torch import numpy as np import re from utils.eval_utils import eval_step from fairseq import utils,tasks from fairseq import checkpoint_utils from fairseq import distributed_utils, options, tasks, utils from fairseq.dataclass.utils import convert_namespace_to_omegaconf from utils.zero_shot_utils import zero_shot_step from tasks.mm_tasks.vqa_gen import VqaGenTask from tasks.mm_tasks.refcoco import RefcocoTask from models.ofa import OFAModel from PIL import Image from torchvision import transforms import gradio as gr # Register VQA task tasks.register_task('refcoco',RefcocoTask) # turn on cuda if GPU is available use_cuda = torch.cuda.is_available() # use fp16 only when GPU is available use_fp16 = False os.system('wget https://huggingface.co/komleva/VQAmodel/resolve/main/checkpoint.best_score_0.9840.pt; ' 'mkdir -p checkpoints; mv checkpoint.best_score_0.9840.pt checkpoints/checkpoint.best_score_0.9840.pt') # specify some options for evaluation parser = options.get_generation_parser() input_args = ["","--beam=100", "--unnormalized", "--path=./checkpoints/checkpoint.best_score_0.9840.pt"] args = options.parse_args_and_arch(parser, input_args) cfg = convert_namespace_to_omegaconf(args) # Load pretrained ckpt & config """task = tasks.setup_task(cfg.task) models, cfg = checkpoint_utils.load_model_ensemble( utils.split_paths(cfg.common_eval.path), task=task )""" overrides={"bpe_dir":"utils/BPE"} models, cfg, task = checkpoint_utils.load_model_ensemble_and_task( utils.split_paths('./checkpoints/checkpoint.best_score_0.9840.pt'), arg_overrides=overrides ) # Move models to GPU for model in models: model.eval() if use_fp16: model.half() if use_cuda and not cfg.distributed_training.pipeline_model_parallel: model.cuda() model.prepare_for_inference_(cfg) # Initialize generator generator = task.build_generator(models, cfg.generation) # Image transform from torchvision import transforms mean = [0.5, 0.5, 0.5] std = [0.5, 0.5, 0.5] patch_resize_transform = transforms.Compose([ lambda image: image.convert("RGB"), transforms.Resize((cfg.task.patch_image_size, cfg.task.patch_image_size), interpolation=Image.BICUBIC), transforms.ToTensor(), transforms.Normalize(mean=mean, std=std), ]) # Text preprocess bos_item = torch.LongTensor([task.src_dict.bos()]) eos_item = torch.LongTensor([task.src_dict.eos()]) pad_idx = task.src_dict.pad() # Normalize the question def pre_question(question, max_ques_words): question = question.lower().lstrip(",.!?*#:;~").replace('-', ' ').replace('/', ' ') question = re.sub( r"\s{2,}", ' ', question, ) question = question.rstrip('\n') question = question.strip(' ') # truncate question question_words = question.split(' ') if len(question_words) > max_ques_words: question = ' '.join(question_words[:max_ques_words]) return question def encode_text(text, length=None, append_bos=False, append_eos=False): s = task.tgt_dict.encode_line( line=task.bpe.encode(text), add_if_not_exist=False, append_eos=False ).long() if length is not None: s = s[:length] if append_bos: s = torch.cat([bos_item, s]) if append_eos: s = torch.cat([s, eos_item]) return s patch_image_size = cfg.task.patch_image_size # Construct input for open-domain VQA task def construct_sample(image: Image, question: str): w, h = image.size w_resize_ratio = torch.tensor(patch_image_size / w).unsqueeze(0) h_resize_ratio = torch.tensor(patch_image_size / h).unsqueeze(0) patch_image = patch_resize_transform(image).unsqueeze(0) patch_mask = torch.tensor([True]) question = pre_question(question, task.cfg.max_src_length) question = question + '?' if not question.endswith('?') else question #src_text = encode_text(' {}'.format(question), append_bos=True, append_eos=True).unsqueeze(0) src_text = encode_text(' which region does the text " {} " describe?'.format(question), append_bos=True, append_eos=True).unsqueeze(0) src_length = torch.LongTensor([s.ne(pad_idx).long().sum() for s in src_text]) ref_dict = np.array([{'yes': 1.0}]) # just placeholder sample = { "id":np.array(['42']), "net_input": { "src_tokens": src_text, "src_lengths": src_length, "patch_images": patch_image, "patch_masks": patch_mask, }, "w_resize_ratios": w_resize_ratio, "h_resize_ratios": h_resize_ratio, "region_coords": torch.randn(1, 4) } return sample # Function to turn FP32 to FP16 def apply_half(t): if t.dtype is torch.float32: return t.to(dtype=torch.half) return t # Function for image captioning def open_domain_vqa(Image, Question): sample = construct_sample(Image, Question) sample = utils.move_to_cuda(sample) if use_cuda else sample sample = utils.apply_to_sample(apply_half, sample) if use_fp16 else sample # Run eval step for open-domain VQA with torch.no_grad(): #result, scores = zero_shot_step(task, generator, models, sample) result, scores = eval_step(task, generator, models, sample) left = int(result[0]["box"][0]) right = int(result[0]["box"][2]) top = int(result[0]["box"][1]) bottom= int(result[0]["box"][3]) if left >right: t = left left = right rights = t if bottom

" examples = [['demo_imgs/000000001093.jpg', 'Which is different from the group?'], ['demo_imgs/000000001964.jpg','What do we drive for personal use?'], ['demo_imgs/000000002473.jpg','What do we use to slide on snow?']] io = gr.Interface(fn=open_domain_vqa, inputs=[gr.inputs.Image(type='pil'), "textbox"], outputs=gr.outputs.Image(type='pil'), title=title, description=description, examples=examples, allow_flagging=False, allow_screenshot=False) io.launch()