Spaces:
Build error
Build error
| import os | |
| import random | |
| import uuid | |
| import json | |
| import time | |
| import asyncio | |
| from threading import Thread | |
| import gradio as gr | |
| import spaces | |
| import torch | |
| import numpy as np | |
| from PIL import Image, ImageOps | |
| import cv2 | |
| from transformers import ( | |
| Qwen2VLForConditionalGeneration, | |
| Qwen2_5_VLForConditionalGeneration, | |
| AutoModelForCausalLM, | |
| AutoModelForVision2Seq, | |
| AutoProcessor, | |
| TextIteratorStreamer, | |
| ) | |
| from transformers.image_utils import load_image | |
| import re | |
| import ast | |
| import html | |
| # Constants for text generation | |
| MAX_MAX_NEW_TOKENS = 5120 | |
| DEFAULT_MAX_NEW_TOKENS = 3072 | |
| MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096")) | |
| device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
| # --- Model Loading --- | |
| # Load Nanonets-OCR-s | |
| MODEL_ID_M = "nanonets/Nanonets-OCR-s" | |
| processor_m = AutoProcessor.from_pretrained(MODEL_ID_M, trust_remote_code=True) | |
| model_m = Qwen2_5_VLForConditionalGeneration.from_pretrained( | |
| MODEL_ID_M, | |
| trust_remote_code=True, | |
| torch_dtype=torch.float16 | |
| ).to(device).eval() | |
| # Load MonkeyOCR | |
| MODEL_ID_G = "echo840/MonkeyOCR" | |
| SUBFOLDER = "Recognition" | |
| processor_g = AutoProcessor.from_pretrained( | |
| MODEL_ID_G, | |
| trust_remote_code=True, | |
| subfolder=SUBFOLDER | |
| ) | |
| model_g = Qwen2_5_VLForConditionalGeneration.from_pretrained( | |
| MODEL_ID_G, | |
| trust_remote_code=True, | |
| subfolder=SUBFOLDER, | |
| torch_dtype=torch.float16 | |
| ).to(device).eval() | |
| # Load Typhoon-OCR-7B | |
| MODEL_ID_L = "scb10x/typhoon-ocr-7b" | |
| processor_l = AutoProcessor.from_pretrained(MODEL_ID_L, trust_remote_code=True) | |
| model_l = Qwen2_5_VLForConditionalGeneration.from_pretrained( | |
| MODEL_ID_L, | |
| trust_remote_code=True, | |
| torch_dtype=torch.float16 | |
| ).to(device).eval() | |
| # Load SmolDocling-256M-preview | |
| MODEL_ID_X = "ds4sd/SmolDocling-256M-preview" | |
| processor_x = AutoProcessor.from_pretrained(MODEL_ID_X, trust_remote_code=True) | |
| model_x = AutoModelForVision2Seq.from_pretrained( | |
| MODEL_ID_X, | |
| trust_remote_code=True, | |
| torch_dtype=torch.float16 | |
| ).to(device).eval() | |
| # Thyme-RL | |
| MODEL_ID_N = "Kwai-Keye/Thyme-RL" | |
| processor_n = AutoProcessor.from_pretrained(MODEL_ID_N, trust_remote_code=True) | |
| model_n = Qwen2_5_VLForConditionalGeneration.from_pretrained( | |
| MODEL_ID_N, | |
| trust_remote_code=True, | |
| torch_dtype=torch.float16 | |
| ).to(device).eval() | |
| # --- Preprocessing and Helper Functions --- | |
| def add_random_padding(image, min_percent=0.1, max_percent=0.10): | |
| """Add random padding to an image based on its size.""" | |
| image = image.convert("RGB") | |
| width, height = image.size | |
| pad_w_percent = random.uniform(min_percent, max_percent) | |
| pad_h_percent = random.uniform(min_percent, max_percent) | |
| pad_w = int(width * pad_w_percent) | |
| pad_h = int(height * pad_h_percent) | |
| corner_pixel = image.getpixel((0, 0)) # Top-left corner | |
| padded_image = ImageOps.expand(image, border=(pad_w, pad_h, pad_w, pad_h), fill=corner_pixel) | |
| return padded_image | |
| def normalize_values(text, target_max=500): | |
| """Normalize numerical values in text to a target maximum.""" | |
| def normalize_list(values): | |
| max_value = max(values) if values else 1 | |
| return [round((v / max_value) * target_max) for v in values] | |
| def process_match(match): | |
| num_list = ast.literal_eval(match.group(0)) | |
| normalized = normalize_list(num_list) | |
| return "".join([f"<loc_{num}>" for num in normalized]) | |
| pattern = r"\[([\d\.\s,]+)\]" | |
| normalized_text = re.sub(pattern, process_match, text) | |
| return normalized_text | |
| def downsample_video(video_path): | |
| """Downsample a video to evenly spaced frames, returning PIL images with timestamps.""" | |
| vidcap = cv2.VideoCapture(video_path) | |
| total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| fps = vidcap.get(cv2.CAP_PROP_FPS) | |
| frames = [] | |
| # Use 10 frames for video processing | |
| frame_indices = np.linspace(0, total_frames - 1, 10, dtype=int) | |
| for i in frame_indices: | |
| vidcap.set(cv2.CAP_PROP_POS_FRAMES, i) | |
| success, image = vidcap.read() | |
| if success: | |
| image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
| pil_image = Image.fromarray(image) | |
| timestamp = round(i / fps, 2) | |
| frames.append((pil_image, timestamp)) | |
| vidcap.release() | |
| return frames | |
| # A placeholder function in case docling_core is not installed | |
| def format_smoldocling_output(buffer_text, images): | |
| cleaned_output = buffer_text.replace("<end_of_utterance>", "").strip() | |
| # Check if docling_core is available and was imported | |
| if 'DocTagsDocument' in globals() and 'DoclingDocument' in globals(): | |
| if any(tag in cleaned_output for tag in ["<doctag>", "<otsl>", "<code>", "<chart>", "<formula>"]): | |
| if "<chart>" in cleaned_output: | |
| cleaned_output = cleaned_output.replace("<chart>", "<otsl>").replace("</chart>", "</otsl>") | |
| cleaned_output = re.sub(r'(<loc_500>)(?!.*<loc_500>)<[^>]+>', r'\1', cleaned_output) | |
| doctags_doc = DocTagsDocument.from_doctags_and_image_pairs([cleaned_output], images) | |
| doc = DoclingDocument.load_from_doctags(doctags_doc, document_name="Document") | |
| markdown_output = doc.export_to_markdown() | |
| return buffer_text, markdown_output | |
| # Fallback if library is not available or tags are not present | |
| return buffer_text, cleaned_output | |
| # --- Core Generation Logic --- | |
| def get_model_and_processor(model_name): | |
| """Helper to select model and processor.""" | |
| if model_name == "Nanonets-OCR-s": | |
| return processor_m, model_m | |
| elif model_name == "MonkeyOCR-Recognition": | |
| return processor_g, model_g | |
| elif model_name == "SmolDocling-256M-preview": | |
| return processor_x, model_x | |
| elif model_name == "Typhoon-OCR-7B": | |
| return processor_l, model_l | |
| elif model_name == "Thyme-RL": | |
| return processor_n, model_n | |
| else: | |
| return None, None | |
| def generate_response(model_name: str, text: str, media_input, media_type: str, | |
| max_new_tokens: int, temperature: float, top_p: float, top_k: int, repetition_penalty: float): | |
| """Unified generation function for both image and video.""" | |
| processor, model = get_model_and_processor(model_name) | |
| if not processor or not model: | |
| yield "Invalid model selected.", "Invalid model selected." | |
| return | |
| if media_input is None: | |
| yield f"Please upload a {media_type}.", f"Please upload a {media_type}." | |
| return | |
| if media_type == "video": | |
| frames = downsample_video(media_input) | |
| images = [frame for frame, _ in frames] | |
| else: # image | |
| images = [media_input] | |
| if model_name == "SmolDocling-256M-preview": | |
| if "OTSL" in text or "code" in text: | |
| images = [add_random_padding(img) for img in images] | |
| if "OCR at text at" in text or "Identify element" in text or "formula" in text: | |
| text = normalize_values(text, target_max=500) | |
| messages = [ | |
| {"role": "user", "content": [{"type": "image"} for _ in images] + [{"type": "text", "text": text}]} | |
| ] | |
| prompt = processor.apply_chat_template(messages, add_generation_prompt=True) | |
| inputs = processor(text=prompt, images=images, return_tensors="pt").to(device) | |
| streamer = TextIteratorStreamer(processor, skip_prompt=True, skip_special_tokens=True) | |
| generation_kwargs = { | |
| **inputs, | |
| "streamer": streamer, | |
| "max_new_tokens": max_new_tokens, | |
| "temperature": temperature, | |
| "top_p": top_p, | |
| "top_k": top_k, | |
| "repetition_penalty": repetition_penalty, | |
| } | |
| thread = Thread(target=model.generate, kwargs=generation_kwargs) | |
| thread.start() | |
| buffer = "" | |
| for new_text in streamer: | |
| buffer += new_text.replace("", "") | |
| yield buffer, buffer | |
| if model_name == "SmolDocling-256M-preview": | |
| raw_output, formatted_output = format_smoldocling_output(buffer, images) | |
| yield raw_output, formatted_output | |
| else: | |
| # For other models, the formatted output is just the cleaned buffer | |
| yield buffer, buffer.strip() | |
| def generate_image_wrapper(*args): | |
| yield from generate_response(*args, media_type="image") | |
| def generate_video_wrapper(*args): | |
| yield from generate_response(*args, media_type="video") | |
| # --- Examples --- | |
| image_examples = [ | |
| ["Reconstruct the doc [table] as it is.", "images/0.png"], | |
| ["Describe the image!", "images/8.png"], | |
| ["OCR the image", "images/2.jpg"], | |
| ["Convert this page to docling", "images/1.png"], | |
| ["Convert this page to docling", "images/3.png"], | |
| ["Convert chart to OTSL.", "images/4.png"], | |
| ["Convert code to text", "images/5.jpg"], | |
| ["Convert this table to OTSL.", "images/6.jpg"], | |
| ["Convert formula to latex.", "images/7.jpg"], | |
| ] | |
| video_examples = [ | |
| ["Explain the video in detail.", "videos/1.mp4"], | |
| ["Explain the video in detail.", "videos/2.mp4"] | |
| ] | |
| # --- Custom CSS for the new UI --- | |
| css = """ | |
| /* Left sidebar styles */ | |
| .sidebar { | |
| background-color: #f8f9fa; | |
| border-right: 1px solid #e9ecef; | |
| padding: 20px; | |
| height: 100vh; | |
| } | |
| /* Main content area */ | |
| .content-area { | |
| padding: 20px; | |
| } | |
| /* Document grid */ | |
| .doc-grid { | |
| display: grid; | |
| grid-template-columns: repeat(5, 1fr); | |
| gap: 10px; | |
| margin: 20px 0; | |
| } | |
| .doc-item { | |
| border: 1px solid #dee2e6; | |
| border-radius: 8px; | |
| padding: 10px; | |
| text-align: center; | |
| height: 120px; | |
| background-color: #f8f9fa; | |
| cursor: pointer; | |
| transition: all 0.2s ease; | |
| } | |
| .doc-item:hover { | |
| border-color: #007bff; | |
| background-color: #e9f0ff; | |
| } | |
| /* Upload and controls area */ | |
| .upload-controls { | |
| display: flex; | |
| align-items: center; | |
| gap: 10px; | |
| margin: 20px 0; | |
| padding: 15px; | |
| border: 1px solid #e9ecef; | |
| border-radius: 8px; | |
| } | |
| .file-upload { | |
| flex: 1; | |
| } | |
| .model-dropdown { | |
| width: 200px; | |
| } | |
| .submit-btn { | |
| background-color: #007bff; | |
| color: white; | |
| border: none; | |
| border-radius: 4px; | |
| padding: 10px 20px; | |
| font-size: 1.2rem; | |
| cursor: pointer; | |
| transition: background-color 0.2s; | |
| } | |
| .submit-btn:hover { | |
| background-color: #0069d9; | |
| } | |
| /* Output area */ | |
| .output-area { | |
| margin-top: 20px; | |
| } | |
| /* Add conversation button */ | |
| .add-conv-btn { | |
| background-color: #28a745; | |
| color: white; | |
| border: none; | |
| padding: 8px 15px; | |
| border-radius: 4px; | |
| cursor: pointer; | |
| } | |
| .add-conv-btn:hover { | |
| background-color: #218838; | |
| } | |
| /* Examples section */ | |
| .examples-section { | |
| margin-top: 20px; | |
| } | |
| /* Header styles */ | |
| .header { | |
| margin-bottom: 15px; | |
| } | |
| /* Media upload icon styling */ | |
| .upload-icon { | |
| font-size: 1.5rem; | |
| color: #6c757d; | |
| margin-right: 10px; | |
| } | |
| /* Document icon styling */ | |
| .doc-icon { | |
| font-size: 2rem; | |
| color: #6c757d; | |
| margin-bottom: 5px; | |
| } | |
| /* Query input */ | |
| .query-input { | |
| margin: 15px 0; | |
| } | |
| /* Model dropdown styling */ | |
| .model-dropdown .select { | |
| padding: 8px 12px; | |
| border: 1px solid #ced4da; | |
| border-radius: 4px; | |
| } | |
| /* Output styling */ | |
| .output-text { | |
| border: 1px solid #ced4da; | |
| border-radius: 4px; | |
| padding: 10px; | |
| min-height: 150px; | |
| } | |
| /* Add some space between elements */ | |
| .gr-box { | |
| margin-bottom: 15px; | |
| } | |
| """ | |
| # --- Gradio Interface --- | |
| with gr.Blocks(css=css) as demo: | |
| gr.Markdown("# **[Multimodal OCR2](https://huggingface.co/collections/prithivMLmods/multimodal-implementations-67c9982ea04b39f0608badb0)**") | |
| with gr.Row(): | |
| # Left sidebar - OCR section | |
| with gr.Column(scale=1, min_width=250, elem_classes="sidebar"): | |
| gr.Markdown("## OCR") | |
| add_conv_btn = gr.Button("+ Add Conv", elem_classes="add-conv-btn") | |
| # Document grid | |
| gr.Markdown("### Documents") | |
| with gr.Group(elem_classes="doc-grid"): | |
| for i in range(5): | |
| with gr.Column(): | |
| gr.Markdown(f'<div class="doc-item"><div class="doc-icon">📄</div>Doc {i+1}</div>') | |
| # Main content area | |
| with gr.Column(scale=3, elem_classes="content-area"): | |
| # Document processing section | |
| with gr.Group(): | |
| gr.Markdown("## Multimodal OCR2") | |
| # Document grid (5 document thumbnails as shown in the sketch) | |
| with gr.Row(elem_classes="doc-grid"): | |
| for i in range(5): | |
| with gr.Column(): | |
| doc_item = gr.Image( | |
| value=None, | |
| label=f"Document {i+1}", | |
| height=120, | |
| show_label=False, | |
| container=False, | |
| elem_classes="doc-item" | |
| ) | |
| # Examples section | |
| gr.Markdown("### Examples") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Examples( | |
| examples=image_examples, | |
| inputs=[image_query, image_upload], | |
| label="Image Examples" | |
| ) | |
| with gr.Column(): | |
| gr.Examples( | |
| examples=video_examples, | |
| inputs=[video_query, video_upload], | |
| label="Video Examples" | |
| ) | |
| # File upload and controls | |
| with gr.Group(elem_classes="upload-controls"): | |
| # File upload area | |
| with gr.Column(elem_classes="file-upload"): | |
| file_upload = gr.File( | |
| label="Upload files (image/video)", | |
| file_types=["image", "video"], | |
| elem_classes="file-upload" | |
| ) | |
| # Model dropdown | |
| model_dropdown = gr.Dropdown( | |
| choices=["Nanonets-OCR-s", "MonkeyOCR-Recognition", "Thyme-RL", "Typhoon-OCR-7B", "SmolDocling-256M-preview"], | |
| value="Nanonets-OCR-s", | |
| label="Select Model", | |
| elem_classes="model-dropdown" | |
| ) | |
| # Submit button | |
| submit_btn = gr.Button("→", size="lg", elem_classes="submit-btn") | |
| # Advanced options (hidden by default) | |
| with gr.Accordion("Advanced Options", open=False): | |
| max_new_tokens = gr.Slider(label="Max New Tokens", minimum=1, maximum=MAX_MAX_NEW_TOKENS, step=1, value=DEFAULT_MAX_NEW_TOKENS) | |
| temperature = gr.Slider(label="Temperature", minimum=0.1, maximum=4.0, step=0.1, value=0.6) | |
| top_p = gr.Slider(label="Top-p (nucleus sampling)", minimum=0.05, maximum=1.0, step=0.05, value=0.9) | |
| top_k = gr.Slider(label="Top-k", minimum=1, maximum=1000, step=1, value=50) | |
| repetition_penalty = gr.Slider(label="Repetition Penalty", minimum=1.0, maximum=2.0, step=0.05, value=1.2) | |
| # Query input | |
| query_input = gr.Textbox( | |
| label="Enter your query", | |
| placeholder="Describe the image, extract text, convert to markdown...", | |
| elem_classes="query-input" | |
| ) | |
| # Output area | |
| with gr.Group(elem_classes="output-area"): | |
| gr.Markdown("### Output") | |
| raw_output = gr.Textbox( | |
| label="Result", | |
| interactive=False, | |
| lines=10, | |
| elem_classes="output-text" | |
| ) | |
| # Initialize state variables | |
| image_query = gr.State("") | |
| video_query = gr.State("") | |
| image_upload = gr.State(None) | |
| video_upload = gr.State(None) | |
| media_type = gr.State("image") | |
| # --- Event Handlers --- | |
| def handle_file_upload(file): | |
| if file is None: | |
| return "image", None, None | |
| file_path = file.name | |
| if file_path.lower().endswith(('.png', '.jpg', '.jpeg', '.gif')): | |
| return "image", Image.open(file_path), None | |
| elif file_path.lower().endswith(('.mp4', '.avi', '.mov', '.mkv')): | |
| return "video", None, file_path | |
| return "image", None, None | |
| file_upload.change( | |
| fn=handle_file_upload, | |
| inputs=[file_upload], | |
| outputs=[media_type, image_upload, video_upload] | |
| ) | |
| def handle_model_selection(model_name): | |
| # This function could be used to update the UI based on model selection | |
| return f"Using {model_name}" | |
| model_dropdown.change( | |
| fn=handle_model_selection, | |
| inputs=[model_dropdown], | |
| outputs=[] | |
| ) | |
| def generate_wrapper(text, img, vid, model, max_tokens, temp, top_p, top_k, rep_penalty, m_type): | |
| if m_type == "image" and img is not None: | |
| yield from generate_image_wrapper(text, img, model, max_tokens, temp, top_p, top_k, rep_penalty) | |
| elif m_type == "video" and vid is not None: | |
| yield from generate_video_wrapper(text, vid, model, max_tokens, temp, top_p, top_k, rep_penalty) | |
| else: | |
| yield "Please upload a valid file", "Please upload a valid file" | |
| submit_btn.click( | |
| fn=generate_wrapper, | |
| inputs=[ | |
| query_input, | |
| image_upload, | |
| video_upload, | |
| model_dropdown, | |
| max_new_tokens, | |
| temperature, | |
| top_p, | |
| top_k, | |
| repetition_penalty, | |
| media_type | |
| ], | |
| outputs=[raw_output, raw_output] | |
| ) | |
| if __name__ == "__main__": | |
| demo.queue(max_size=50).launch(share=True, show_error=True) |