| |
| import os |
| from datetime import datetime |
| import subprocess |
| import time |
| import uuid |
| import io |
| from threading import Thread |
|
|
| |
| import numpy as np |
| import torch |
| from PIL import Image |
| import accelerate |
| import gradio as gr |
| import spaces |
| from transformers import ( |
| Qwen2_5_VLForConditionalGeneration, |
| AutoTokenizer, |
| AutoProcessor, |
| TextIteratorStreamer |
| ) |
|
|
| |
| from qwen_vl_utils import process_vision_info |
|
|
| |
| if torch.cuda.is_available(): |
| device = "cuda" |
| elif (torch.backends.mps.is_available()) and (torch.backends.mps.is_built()): |
| device = "mps" |
| else: |
| device = "cpu" |
|
|
| print(f"[INFO] Using device: {device}") |
|
|
| |
| image_extensions = Image.registered_extensions() |
| video_extensions = ("avi", "mp4", "mov", "mkv", "flv", "wmv", "mjpeg", "gif", "webm", "m4v", "3gp") |
|
|
|
|
| def identify_and_save_blob(blob_path): |
| """ |
| Identifies if the blob is an image or video and saves it with a unique name. |
| Returns the saved file path and its media type ("image" or "video"). |
| """ |
| try: |
| with open(blob_path, 'rb') as file: |
| blob_content = file.read() |
|
|
| |
| try: |
| Image.open(io.BytesIO(blob_content)).verify() |
| extension = ".png" |
| media_type = "image" |
| except (IOError, SyntaxError): |
| |
| |
| |
| _, ext = os.path.splitext(blob_path) |
| if ext.lower() in video_extensions: |
| extension = ext.lower() |
| else: |
| extension = ".mp4" |
| media_type = "video" |
|
|
| |
| filename = f"temp_{uuid.uuid4()}_media{extension}" |
| with open(filename, "wb") as f: |
| f.write(blob_content) |
|
|
| return filename, media_type |
|
|
| except FileNotFoundError: |
| raise ValueError(f"The file {blob_path} was not found.") |
| except Exception as e: |
| raise ValueError(f"An error occurred while processing the file: {e}") |
|
|
|
|
| |
| |
| models = { |
| "Qwen/Qwen2.5-VL-7B-Instruct": Qwen2_5_VLForConditionalGeneration.from_pretrained("Qwen/Qwen2.5-VL-7B-Instruct", |
| trust_remote_code=True, |
| torch_dtype="auto", |
| device_map="auto").eval(), |
| "Qwen/Qwen2.5-VL-3B-Instruct": Qwen2_5_VLForConditionalGeneration.from_pretrained("Qwen/Qwen2.5-VL-3B-Instruct", |
| trust_remote_code=True, |
| torch_dtype="auto", |
| device_map="auto").eval() |
| } |
|
|
| processors = { |
| "Qwen/Qwen2.5-VL-7B-Instruct": AutoProcessor.from_pretrained("Qwen/Qwen2.5-VL-7B-Instruct", trust_remote_code=True), |
| "Qwen/Qwen2.5-VL-3B-Instruct": AutoProcessor.from_pretrained("Qwen/Qwen2.5-VL-3B-Instruct", trust_remote_code=True) |
| } |
|
|
| DESCRIPTION = "[Qwen2.5-VL Demo](https://huggingface.co/collections/Qwen/qwen25-vl-6795ffac22b334a837c0f9a5)" |
|
|
| @spaces.GPU |
| def run_example(media_input, text_input=None, model_id=None): |
| if media_input is None: |
| raise gr.Error("No media provided. Please upload an image or video before submitting.") |
| if model_id is None: |
| raise gr.Error("No model selected. Please select a model.") |
|
|
| start_time = time.time() |
|
|
| media_path = None |
| media_type = None |
|
|
| |
| if isinstance(media_input, np.ndarray): |
| img = Image.fromarray(np.uint8(media_input)) |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| filename = f"image_{timestamp}.png" |
| img.save(filename) |
| media_path = os.path.abspath(filename) |
| media_type = "image" |
| elif isinstance(media_input, str): |
| path = media_input |
| _, ext = os.path.splitext(path) |
| ext = ext.lower() |
|
|
| if ext in image_extensions: |
| media_path = path |
| media_type = "image" |
| elif ext in video_extensions: |
| media_path = path |
| media_type = "video" |
| else: |
| |
| try: |
| media_path, media_type = identify_and_save_blob(path) |
| print(f"Identified blob as: {media_type}, saved to: {media_path}") |
| except Exception as e: |
| print(f"Error identifying blob: {e}") |
| raise gr.Error("Unsupported media type. Please upload an image (PNG, JPG, etc.) or a video (MP4, AVI, etc.).") |
| else: |
| raise gr.Error("Unsupported input type for media. Please upload an image or video.") |
|
|
| print(f"[INFO] Processing {media_type} from {media_path}") |
|
|
| model = models[model_id] |
| processor = processors[model_id] |
|
|
| |
| content_list = [] |
| if media_type == "image": |
| content_list.append({"type": "image", "image": media_path}) |
| elif media_type == "video": |
| content_list.append({"type": "video", "video": media_path, "fps": 8.0}) |
| |
| if text_input: |
| content_list.append({"type": "text", "text": text_input}) |
| else: |
| |
| content_list.append({"type": "text", "text": "What is in this image/video?"}) |
|
|
|
|
| messages = [{"role": "user", "content": content_list}] |
|
|
| |
| text = processor.apply_chat_template( |
| messages, tokenize=False, add_generation_prompt=True |
| ) |
| image_inputs, video_inputs = process_vision_info(messages) |
| inputs = processor( |
| text=[text], |
| images=image_inputs, |
| videos=video_inputs, |
| padding=True, |
| return_tensors="pt", |
| ).to(device) |
|
|
| |
| streamer = TextIteratorStreamer( |
| processor, skip_prompt=True, **{"skip_special_tokens": True} |
| ) |
| generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=1024) |
|
|
| |
| thread = Thread(target=model.generate, kwargs=generation_kwargs) |
| thread.start() |
|
|
| buffer = "" |
| for new_text in streamer: |
| buffer += new_text |
| yield buffer, None |
| |
| |
| |
|
|
|
|
| end_time = time.time() |
| total_time = round(end_time - start_time, 2) |
| |
| |
| yield buffer, f"{total_time} seconds" |
|
|
| |
| if media_path and os.path.exists(media_path) and "temp_" in os.path.basename(media_path): |
| os.remove(media_path) |
| print(f"[INFO] Cleaned up temporary file: {media_path}") |
|
|
|
|
| css = """ |
| #output { |
| height: 500px; |
| overflow: auto; |
| border: 1px solid #ccc; |
| } |
| """ |
|
|
| with gr.Blocks(css=css) as demo: |
| gr.Markdown(DESCRIPTION) |
| with gr.Tab(label="Qwen2.5-VL Input"): |
| with gr.Row(): |
| with gr.Column(): |
| |
| input_media = gr.File( |
| label="Upload Image or Video (JPG, PNG, MP4, AVI, etc.)", |
| type="filepath" |
| ) |
| model_selector = gr.Dropdown(choices=list(models.keys()), |
| label="Model", |
| value="Qwen/Qwen2.5-VL-7B-Instruct") |
| text_input = gr.Textbox(label="Text Prompt") |
| submit_btn = gr.Button(value="Submit") |
| with gr.Column(): |
| output_text = gr.Textbox(label="Output Text", interactive=False) |
| time_taken = gr.Textbox(label="Time taken for processing + inference", interactive=False) |
|
|
| submit_btn.click(run_example, |
| [input_media, text_input, model_selector], |
| [output_text, time_taken]) |
|
|
| demo.launch(debug=True) |