| |
|
|
| from __future__ import annotations |
|
|
| import os |
| import string |
|
|
| import gradio as gr |
| import PIL.Image |
| import torch |
| from transformers import AutoProcessor, Blip2ForConditionalGeneration |
|
|
| DESCRIPTION = "# [BLIP-2](https://github.com/salesforce/LAVIS/tree/main/projects/blip2)" |
|
|
| if not torch.cuda.is_available(): |
| DESCRIPTION += "\n<p>Running on CPU 🥶 This demo does not work on CPU.</p>" |
|
|
| device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") |
|
|
| MODEL_ID_OPT_6_7B = "Salesforce/blip2-opt-6.7b" |
| MODEL_ID_FLAN_T5_XXL = "Salesforce/blip2-flan-t5-xxl" |
| MODEL_ID = os.getenv("MODEL_ID", MODEL_ID_FLAN_T5_XXL) |
|
|
| if torch.cuda.is_available(): |
| processor = AutoProcessor.from_pretrained(MODEL_ID) |
| model = Blip2ForConditionalGeneration.from_pretrained(MODEL_ID, device_map="auto", load_in_8bit=True) |
| else: |
| processor = None |
| model = None |
|
|
|
|
| def generate_caption( |
| image: PIL.Image.Image, |
| decoding_method: str, |
| temperature: float, |
| length_penalty: float, |
| repetition_penalty: float, |
| ) -> str: |
| inputs = processor(images=image, return_tensors="pt").to(device, torch.float16) |
| generated_ids = model.generate( |
| pixel_values=inputs.pixel_values, |
| do_sample=decoding_method == "Nucleus sampling", |
| temperature=temperature, |
| length_penalty=length_penalty, |
| repetition_penalty=repetition_penalty, |
| max_length=50, |
| min_length=1, |
| num_beams=5, |
| top_p=0.9, |
| ) |
| result = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip() |
| return result |
|
|
|
|
| def answer_question( |
| image: PIL.Image.Image, |
| text: str, |
| decoding_method: str, |
| temperature: float, |
| length_penalty: float, |
| repetition_penalty: float, |
| ) -> str: |
| inputs = processor(images=image, text=text, return_tensors="pt").to(device, torch.float16) |
| generated_ids = model.generate( |
| **inputs, |
| do_sample=decoding_method == "Nucleus sampling", |
| temperature=temperature, |
| length_penalty=length_penalty, |
| repetition_penalty=repetition_penalty, |
| max_length=30, |
| min_length=1, |
| num_beams=5, |
| top_p=0.9, |
| ) |
| result = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip() |
| return result |
|
|
|
|
| def postprocess_output(output: str) -> str: |
| if output and output[-1] not in string.punctuation: |
| output += "." |
| return output |
|
|
|
|
| def chat( |
| image: PIL.Image.Image, |
| text: str, |
| decoding_method: str, |
| temperature: float, |
| length_penalty: float, |
| repetition_penalty: float, |
| history_orig: list[str] = [], |
| history_qa: list[str] = [], |
| ) -> tuple[dict[str, list[str]], dict[str, list[str]], dict[str, list[str]]]: |
| history_orig.append(text) |
| text_qa = f"Question: {text} Answer:" |
| history_qa.append(text_qa) |
| prompt = " ".join(history_qa) |
|
|
| output = answer_question( |
| image, |
| prompt, |
| decoding_method, |
| temperature, |
| length_penalty, |
| repetition_penalty, |
| ) |
| output = postprocess_output(output) |
| history_orig.append(output) |
| history_qa.append(output) |
|
|
| chat_val = list(zip(history_orig[0::2], history_orig[1::2])) |
| return gr.update(value=chat_val), gr.update(value=history_orig), gr.update(value=history_qa) |
|
|
|
|
| examples = [ |
| [ |
| "house.png", |
| "How could someone get out of the house?", |
| ], |
| [ |
| "flower.jpg", |
| "What is this flower and where is it's origin?", |
| ], |
| [ |
| "pizza.jpg", |
| "What are steps to cook it?", |
| ], |
| [ |
| "sunset.jpg", |
| "Here is a romantic message going along the photo:", |
| ], |
| [ |
| "forbidden_city.webp", |
| "In what dynasties was this place built?", |
| ], |
| ] |
|
|
| with gr.Blocks(css="style.css") as demo: |
| gr.Markdown(DESCRIPTION) |
| gr.DuplicateButton( |
| value="Duplicate Space for private use", |
| elem_id="duplicate-button", |
| visible=os.getenv("SHOW_DUPLICATE_BUTTON") == "1", |
| ) |
|
|
| image = gr.Image(type="pil") |
| with gr.Accordion(label="Advanced settings", open=False): |
| sampling_method = gr.Radio( |
| label="Text Decoding Method", |
| choices=["Beam search", "Nucleus sampling"], |
| value="Beam search", |
| ) |
| temperature = gr.Slider( |
| label="Temperature (used with nucleus sampling)", |
| minimum=0.5, |
| maximum=1.0, |
| value=1.0, |
| step=0.1, |
| ) |
| length_penalty = gr.Slider( |
| label="Length Penalty (set to larger for longer sequence, used with beam search)", |
| minimum=-1.0, |
| maximum=2.0, |
| value=1.0, |
| step=0.2, |
| ) |
| rep_penalty = gr.Slider( |
| label="Repeat Penalty (larger value prevents repetition)", |
| minimum=1.0, |
| maximum=5.0, |
| value=1.5, |
| step=0.5, |
| ) |
| with gr.Row(): |
| with gr.Column(): |
| with gr.Box(): |
| caption_button = gr.Button(value="Caption it!") |
| caption_output = gr.Textbox(label="Caption Output", show_label=False).style(container=False) |
| with gr.Column(): |
| with gr.Box(): |
| chatbot = gr.Chatbot(label="VQA Chat") |
| history_orig = gr.State(value=[]) |
| history_qa = gr.State(value=[]) |
| vqa_input = gr.Text(label="Chat Input", show_label=False, max_lines=1).style(container=False) |
| with gr.Row(): |
| clear_chat_button = gr.Button(value="Clear") |
| chat_button = gr.Button(value="Submit") |
|
|
| gr.Examples( |
| examples=examples, |
| inputs=[image, vqa_input], |
| ) |
|
|
| caption_button.click( |
| fn=generate_caption, |
| inputs=[ |
| image, |
| sampling_method, |
| temperature, |
| length_penalty, |
| rep_penalty, |
| ], |
| outputs=caption_output, |
| api_name="caption", |
| ) |
|
|
| chat_inputs = [ |
| image, |
| vqa_input, |
| sampling_method, |
| temperature, |
| length_penalty, |
| rep_penalty, |
| history_orig, |
| history_qa, |
| ] |
| chat_outputs = [ |
| chatbot, |
| history_orig, |
| history_qa, |
| ] |
| vqa_input.submit( |
| fn=chat, |
| inputs=chat_inputs, |
| outputs=chat_outputs, |
| ) |
| chat_button.click( |
| fn=chat, |
| inputs=chat_inputs, |
| outputs=chat_outputs, |
| api_name="chat", |
| ) |
| clear_chat_button.click( |
| fn=lambda: ("", [], [], []), |
| inputs=None, |
| outputs=[ |
| vqa_input, |
| chatbot, |
| history_orig, |
| history_qa, |
| ], |
| queue=False, |
| api_name="clear", |
| ) |
| image.change( |
| fn=lambda: ("", [], [], []), |
| inputs=None, |
| outputs=[ |
| caption_output, |
| chatbot, |
| history_orig, |
| history_qa, |
| ], |
| queue=False, |
| ) |
|
|
| if __name__ == "__main__": |
| demo.queue(max_size=10).launch() |
|
|