import os import random import uuid import json import time import asyncio from threading import Thread from pathlib import Path from io import BytesIO from typing import Optional, Tuple, Dict, Any import gradio as gr import spaces import torch import numpy as np from PIL import Image import cv2 import requests import fitz # PyMuPDF from transformers import ( Qwen3VLMoeForConditionalGeneration, AutoProcessor, TextIteratorStreamer, ) from transformers.image_utils import load_image # Constants for text generation MAX_MAX_NEW_TOKENS = 4096 DEFAULT_MAX_NEW_TOKENS = 2048 # Let the environment (e.g., Hugging Face Spaces) determine the device. device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print("CUDA_VISIBLE_DEVICES=", os.environ.get("CUDA_VISIBLE_DEVICES")) print("torch.__version__ =", torch.__version__) print("torch.version.cuda =", torch.version.cuda) print("cuda available:", torch.cuda.is_available()) print("cuda device count:", torch.cuda.device_count()) if torch.cuda.is_available(): print("current device:", torch.cuda.current_device()) print("device name:", torch.cuda.get_device_name(torch.cuda.current_device())) print("Using device:", device) # --- Model Loading --- # Load Qwen3VL MODEL_ID_Q3VL = "Qwen/Qwen3-VL-30B-A3B-Instruct" processor_q3vl = AutoProcessor.from_pretrained(MODEL_ID_Q3VL, trust_remote_code=True, use_fast=False) model_q3vl = Qwen3VLMoeForConditionalGeneration.from_pretrained( MODEL_ID_Q3VL, trust_remote_code=True, dtype=torch.float16 ).to(device).eval() def downsample_video(video_path): """ Downsamples the video to evenly spaced frames. """ vidcap = cv2.VideoCapture(video_path) total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = vidcap.get(cv2.CAP_PROP_FPS) frames = [] frame_indices = np.linspace(0, total_frames - 1, min(total_frames, 10), dtype=int) for i in frame_indices: vidcap.set(cv2.CAP_PROP_POS_FRAMES, i) success, image = vidcap.read() if success: image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) pil_image = Image.fromarray(image) frames.append(pil_image) vidcap.release() return frames def convert_pdf_to_images(file_path: str, dpi: int = 200): """ Converts a PDF file into a list of PIL Images. """ if not file_path: return [] images = [] pdf_document = fitz.open(file_path) zoom = dpi / 72.0 mat = fitz.Matrix(zoom, zoom) for page_num in range(len(pdf_document)): page = pdf_document.load_page(page_num) pix = page.get_pixmap(matrix=mat) img_data = pix.tobytes("png") images.append(Image.open(BytesIO(img_data))) pdf_document.close() return images def get_initial_pdf_state() -> Dict[str, Any]: """Returns the default initial state for the PDF viewer.""" return {"pages": [], "total_pages": 0, "current_page_index": 0} def load_and_preview_pdf(file_path: Optional[str]) -> Tuple[Optional[Image.Image], Dict[str, Any], str]: """ Loads a PDF, converts pages to images, and prepares the state for preview. """ state = get_initial_pdf_state() if not file_path: return None, state, '
No file loaded
' try: pages = convert_pdf_to_images(file_path) if not pages: return None, state, '
Could not load file
' state["pages"] = pages state["total_pages"] = len(pages) page_info_html = f'
Page 1 / {state["total_pages"]}
' return pages[0], state, page_info_html except Exception as e: return None, state, f'
Failed to load preview: {e}
' def navigate_pdf_page(direction: str, state: Dict[str, Any]): """ Navigates to the previous or next page in the PDF preview. """ if not state or not state["pages"]: return None, state, '
No file loaded
' current_index = state["current_page_index"] total_pages = state["total_pages"] if direction == "prev": new_index = max(0, current_index - 1) elif direction == "next": new_index = min(total_pages - 1, current_index + 1) else: new_index = current_index state["current_page_index"] = new_index image_preview = state["pages"][new_index] page_info_html = f'
Page {new_index + 1} / {total_pages}
' return image_preview, state, page_info_html @spaces.GPU def generate_image(text: str, image: Image.Image, max_new_tokens: int = 1024, temperature: float = 0.6, top_p: float = 0.9, top_k: int = 50, repetition_penalty: float = 1.2): """ Generates responses for a single image input. """ if image is None: yield "Please upload an image.", "Please upload an image." return messages = [{"role": "user", "content": [{"type": "image"}, {"type": "text", "text": text}]}] prompt_full = processor_q3vl.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) inputs = processor_q3vl(text=[prompt_full], images=[image], return_tensors="pt", padding=True).to(device) streamer = TextIteratorStreamer(processor_q3vl, skip_prompt=True, skip_special_tokens=True) generation_kwargs = {**inputs, "streamer": streamer, "max_new_tokens": max_new_tokens} thread = Thread(target=model_q3vl.generate, kwargs=generation_kwargs) thread.start() buffer = "" for new_text in streamer: buffer += new_text time.sleep(0.01) yield buffer, buffer @spaces.GPU def generate_video(text: str, video_path: str, max_new_tokens: int = 1024, temperature: float = 0.6, top_p: float = 0.9, top_k: int = 50, repetition_penalty: float = 1.2): """ Generates responses for a video input by processing downsampled frames. """ if video_path is None: yield "Please upload a video.", "Please upload a video." return frames = downsample_video(video_path) if not frames: yield "Could not process video.", "Could not process video." return messages = [{"role": "user", "content": [{"type": "text", "text": text}]}] for frame in frames: messages[0]["content"].insert(0, {"type": "image"}) prompt_full = processor_q3vl.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) inputs = processor_q3vl(text=[prompt_full], images=frames, return_tensors="pt", padding=True).to(device) streamer = TextIteratorStreamer(processor_q3vl, skip_prompt=True, skip_special_tokens=True) generation_kwargs = { **inputs, "streamer": streamer, "max_new_tokens": max_new_tokens, "do_sample": True, "temperature": temperature, "top_p": top_p, "top_k": top_k, "repetition_penalty": repetition_penalty, } thread = Thread(target=model_q3vl.generate, kwargs=generation_kwargs) thread.start() buffer = "" for new_text in streamer: buffer += new_text buffer = buffer.replace("<|im_end|>", "") time.sleep(0.01) yield buffer, buffer @spaces.GPU def generate_pdf(text: str, state: Dict[str, Any], max_new_tokens: int = 2048, temperature: float = 0.6, top_p: float = 0.9, top_k: int = 50, repetition_penalty: float = 1.2): """ Processes a PDF file page by page using the pre-loaded images from the state. """ if not state or not state["pages"]: yield "Please upload a PDF file first.", "Please upload a PDF file first." return page_images = state["pages"] full_response = "" for i, image in enumerate(page_images): page_header = f"--- Page {i+1}/{len(page_images)} ---\n" yield full_response + page_header, full_response + page_header messages = [{"role": "user", "content": [{"type": "image"}, {"type": "text", "text": text}]}] prompt_full = processor_q3vl.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) inputs = processor_q3vl(text=[prompt_full], images=[image], return_tensors="pt", padding=True).to(device) streamer = TextIteratorStreamer(processor_q3vl, skip_prompt=True, skip_special_tokens=True) generation_kwargs = {**inputs, "streamer": streamer, "max_new_tokens": max_new_tokens} thread = Thread(target=model_q3vl.generate, kwargs=generation_kwargs) thread.start() page_buffer = "" for new_text in streamer: page_buffer += new_text yield full_response + page_header + page_buffer, full_response + page_header + page_buffer time.sleep(0.01) full_response += page_header + page_buffer + "\n\n" # --- Gradio Interface --- image_examples = [ ["Describe the safety measures in the image. Conclude (Safe / Unsafe)..", "images/5.jpg"], ["Convert this page to doc [markdown] precisely.", "images/3.png"], ] video_examples = [["Explain the video in detail.", "videos/2.mp4"]] pdf_examples = [["examples/sample-doc.pdf"]] css = """ /* From Uiverse.io by mi-series */ body { background: #FCEDDA; } .gradio-container { background: #FCEDDA; } .container { position: relative; max-width: 500px; width: 100%; background: #FCEDDA; padding: 25px; border-radius: 8px; box-shadow: 0 0 15px rgba(0, 0, 0, 0.1); } .container header { font-size: 1.2rem; color: #000; font-weight: 600; text-align: center; } .container .form { margin-top: 15px; } .form .input-box { width: 100%; margin-top: 10px; } .input-box label { color: #000; } .form :where(.input-box input, .select-box) { position: relative; height: 35px; width: 100%; outline: none; font-size: 1rem; color: #808080; margin-top: 5px; border: 1px solid #EE4E34; border-radius: 6px; padding: 0 15px; background: #FCEDDA; } .input-box input:focus { box-shadow: 0 1px 0 rgba(0, 0, 0, 0.1); } .form .column { display: flex; column-gap: 15px; } .form .gender-box { margin-top: 10px; } .form :where(.gender-option, .gender) { display: flex; align-items: center; column-gap: 50px; flex-wrap: wrap; } .form .gender { column-gap: 5px; } .gender input { accent-color: #EE4E34; } .form :where(.gender input, .gender label) { cursor: pointer; } .gender label { color: #000; } .address :where(input, .select-box) { margin-top: 10px; } .select-box select { height: 100%; width: 100%; outline: none; border: none; color: #808080; font-size: 1rem; background: #FCEDDA; } .form button, .submit-btn { height: 40px; width: 100%; color: #000 !important; font-size: 1rem; font-weight: 400; margin-top: 15px; border: none; border-radius: 6px; cursor: pointer; transition: all 0.2s ease; background: #EE4E34 !important; } .form button:hover, .submit-btn:hover { background: #EE3E34 !important; } .canvas-output { border: 2px solid #EE4E34; border-radius: 10px; padding: 20px; background: #FCEDDA; } """ with gr.Blocks(css=css) as demo: pdf_state = gr.State(value=get_initial_pdf_state()) gr.Markdown("# **Qwen3-VL-Processor**") with gr.Row(): with gr.Column(scale=2): with gr.Tabs(): with gr.TabItem("Image Inference"): image_query = gr.Textbox(label="Query Input", placeholder="Enter your query here...") image_upload = gr.Image(type="pil", label="Image", height=290) image_submit = gr.Button("Submit", elem_classes="submit-btn") gr.Examples(examples=image_examples, inputs=[image_query, image_upload]) with gr.TabItem("Video Inference"): video_query = gr.Textbox(label="Query Input", placeholder="Enter your query here...") video_upload = gr.Video(label="Video", height=290) video_submit = gr.Button("Submit", elem_classes="submit-btn") gr.Examples(examples=video_examples, inputs=[video_query, video_upload]) with gr.TabItem("PDF Inference"): with gr.Row(): with gr.Column(scale=1): pdf_query = gr.Textbox(label="Query Input", placeholder="e.g., 'Summarize this document'") pdf_upload = gr.File(label="Upload PDF", file_types=[".pdf"]) #gr.Examples(examples=pdf_examples, inputs=[pdf_upload]) pdf_submit = gr.Button("Submit", elem_classes="submit-btn") with gr.Column(scale=1): pdf_preview_img = gr.Image(label="PDF Preview", height=290) with gr.Row(): prev_page_btn = gr.Button("◀ Previous") page_info = gr.HTML('
No file loaded
') next_page_btn = gr.Button("Next ▶") with gr.Accordion("Advanced options", open=False): max_new_tokens = gr.Slider(label="Max new tokens", minimum=1, maximum=MAX_MAX_NEW_TOKENS, step=1, value=DEFAULT_MAX_NEW_TOKENS) temperature = gr.Slider(label="Temperature", minimum=0.1, maximum=4.0, step=0.1, value=0.6) top_p = gr.Slider(label="Top-p (nucleus sampling)", minimum=0.05, maximum=1.0, step=0.05, value=0.9) top_k = gr.Slider(label="Top-k", minimum=1, maximum=1000, step=1, value=50) repetition_penalty = gr.Slider(label="Repetition penalty", minimum=1.0, maximum=2.0, step=0.05, value=1.2) with gr.Column(scale=3): with gr.Column(elem_classes="canvas-output"): gr.Markdown("## Output") output = gr.Textbox(label="Raw Output Stream", interactive=False, lines=14, show_copy_button=True) with gr.Accordion("(Result.md)", open=False): markdown_output = gr.Markdown(label="(Result.Md)") # Event handlers image_submit.click( fn=generate_image, inputs=[image_query, image_upload, max_new_tokens, temperature, top_p, top_k, repetition_penalty], outputs=[output, markdown_output] ) video_submit.click( fn=generate_video, inputs=[video_query, video_upload, max_new_tokens, temperature, top_p, top_k, repetition_penalty], outputs=[output, markdown_output] ) pdf_submit.click( fn=generate_pdf, inputs=[pdf_query, pdf_state, max_new_tokens, temperature, top_p, top_k, repetition_penalty], outputs=[output, markdown_output] ) pdf_upload.change( fn=load_and_preview_pdf, inputs=[pdf_upload], outputs=[pdf_preview_img, pdf_state, page_info] ) prev_page_btn.click( fn=lambda s: navigate_pdf_page("prev", s), inputs=[pdf_state], outputs=[pdf_preview_img, pdf_state, page_info] ) next_page_btn.click( fn=lambda s: navigate_pdf_page("next", s), inputs=[pdf_state], outputs=[pdf_preview_img, pdf_state, page_info] ) if __name__ == "__main__": demo.queue(max_size=50).launch(mcp_server=True, ssr_mode=False, show_error=True)