Multimodal-OCR3 / app.py
prithivMLmods's picture
Update app.py
3396a9a verified
raw
history blame
10.9 kB
import os
import random
import uuid
import json
import time
import asyncio
from threading import Thread
from typing import Iterable
import gradio as gr
import spaces
import torch
import numpy as np
from PIL import Image, ImageOps
import cv2
import requests
from transformers import (
AutoTokenizer,
AutoProcessor,
TextIteratorStreamer,
)
from transformers.image_utils import load_image
# The custom model class is imported via trust_remote_code=True
from transformers import AutoModelForImageTextToText
from gradio.themes import Soft
from gradio.themes.utils import colors, fonts, sizes
from docling_core.types.doc import DoclingDocument, DocTagsDocument
import re
import ast
import html
# --- Theme and CSS Definition ---
colors.steel_blue = colors.Color(
name="steel_blue",
c50="#EBF3F8",
c100="#D3E5F0",
c200="#A8CCE1",
c300="#7DB3D2",
c400="#529AC3",
c500="#4682B4", # SteelBlue base color
c600="#3E72A0",
c700="#36638C",
c800="#2E5378",
c900="#264364",
c950="#1E3450",
)
class SteelBlueTheme(Soft):
def __init__(
self,
*,
primary_hue: colors.Color | str = colors.gray,
secondary_hue: colors.Color | str = colors.steel_blue,
neutral_hue: colors.Color | str = colors.slate,
text_size: sizes.Size | str = sizes.text_lg,
font: fonts.Font | str | Iterable[fonts.Font | str] = (
fonts.GoogleFont("Outfit"), "Arial", "sans-serif",
),
font_mono: fonts.Font | str | Iterable[fonts.Font | str] = (
fonts.GoogleFont("IBM Plex Mono"), "ui-monospace", "monospace",
),
):
super().__init__(
primary_hue=primary_hue,
secondary_hue=secondary_hue,
neutral_hue=neutral_hue,
text_size=text_size,
font=font,
font_mono=font_mono,
)
super().set(
background_fill_primary="*primary_50",
background_fill_primary_dark="*primary_900",
body_background_fill="linear-gradient(135deg, *primary_200, *primary_100)",
body_background_fill_dark="linear-gradient(135deg, *primary_900, *primary_800)",
button_primary_text_color="white",
button_primary_text_color_hover="white",
button_primary_background_fill="linear-gradient(90deg, *secondary_500, *secondary_600)",
button_primary_background_fill_hover="linear-gradient(90deg, *secondary_600, *secondary_700)",
button_primary_background_fill_dark="linear-gradient(90deg, *secondary_600, *secondary_700)",
button_primary_background_fill_hover_dark="linear-gradient(90deg, *secondary_500, *secondary_600)",
slider_color="*secondary_500",
slider_color_dark="*secondary_600",
block_title_text_weight="600",
block_border_width="3px",
block_shadow="*shadow_drop_lg",
button_primary_shadow="*shadow_drop_lg",
button_large_padding="11px",
color_accent_soft="*primary_100",
block_label_background_fill="*primary_200",
)
steel_blue_theme = SteelBlueTheme()
css = """
#main-title h1 {
font-size: 2.3em !important;
}
#output-title h2 {
font-size: 2.1em !important;
}
"""
# Constants for text generation
MAX_MAX_NEW_TOKENS = 5120
DEFAULT_MAX_NEW_TOKENS = 3072
MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096"))
# Check for CUDA availability
device = "cuda" if torch.cuda.is_available() else "cpu"
# Load Nanonets-OCR2-3B
MODEL_ID_3B = "nanonets/Nanonets-OCR2-3B"
processor_3b = AutoProcessor.from_pretrained(MODEL_ID_3B, trust_remote_code=True)
model_3b = AutoModelForImageTextToText.from_pretrained(
MODEL_ID_3B,
torch_dtype="auto",
device_map="auto",
trust_remote_code=True,
attn_implementation="flash_attention_2"
).eval()
# Load Nanonets-OCR2-1.5B-exp
MODEL_ID_1_5B = "nanonets/Nanonets-OCR2-1.5B-exp"
processor_1_5b = AutoProcessor.from_pretrained(MODEL_ID_1_5B, trust_remote_code=True)
model_1_5b = AutoModelForImageTextToText.from_pretrained(
MODEL_ID_1_5B,
torch_dtype="auto",
device_map="auto",
trust_remote_code=True,
attn_implementation="flash_attention_2"
).eval()
def downsample_video(video_path):
"""Downsample a video to evenly spaced frames, returning PIL images with timestamps."""
vidcap = cv2.VideoCapture(video_path)
total_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = vidcap.get(cv2.CAP_PROP_FPS)
frames = []
# Use a smaller number of frames for video to avoid overwhelming the model
frame_indices = np.linspace(0, total_frames - 1, min(total_frames, 10), dtype=int)
for i in frame_indices:
vidcap.set(cv2.CAP_PROP_POS_FRAMES, i)
success, image = vidcap.read()
if success:
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(image)
timestamp = round(i / fps, 2)
frames.append((pil_image, timestamp))
vidcap.release()
return frames
@spaces.GPU
def generate(model_name: str, text: str, media_input, media_type: str,
max_new_tokens: int = 1024,
temperature: float = 0.6,
top_p: float = 0.9,
top_k: int = 50,
repetition_penalty: float = 1.2):
"""Generic generation function for both image and video."""
if model_name == "Nanonets-OCR2-3B":
processor, model = processor_3b, model_3b
elif model_name == "Nanonets-OCR2-1.5B-exp":
processor, model = processor_1_5b, model_1_5b
else:
yield "Invalid model selected.", "Invalid model selected."
return
if media_input is None:
yield f"Please upload an {media_type}.", f"Please upload an {media_type}."
return
if media_type == "image":
images = [media_input]
elif media_type == "video":
frames = downsample_video(media_input)
images = [frame for frame, _ in frames]
else:
yield "Invalid media type.", "Invalid media type."
return
messages = [
{
"role": "user",
"content": [{"type": "image"} for _ in images] + [
{"type": "text", "text": text}
]
}
]
prompt = processor.apply_chat_template(messages, add_generation_prompt=True)
# Since device_map="auto" is used, we don't need .to(device)
inputs = processor(text=prompt, images=images, return_tensors="pt")
streamer = TextIteratorStreamer(processor, skip_prompt=True, skip_special_tokens=True)
generation_kwargs = {
**inputs,
"streamer": streamer,
"max_new_tokens": max_new_tokens,
"temperature": temperature,
"top_p": top_p,
"top_k": top_k,
"repetition_penalty": repetition_penalty,
}
thread = Thread(target=model.generate, kwargs=generation_kwargs)
thread.start()
buffer = ""
for new_text in streamer:
buffer += new_text.replace("<|im_end|>", "")
yield buffer, buffer
# Wrapper functions for Gradio clarity
def generate_image(*args):
yield from generate(*args[:3], media_input=args[2], media_type="image", *args[3:])
def generate_video(*args):
yield from generate(*args[:3], media_input=args[2], media_type="video", *args[3:])
# Define examples for image and video inference
image_examples = [
["Reconstruct the doc [table] as it is.", "images/0.png"],
["Describe the image!", "images/8.png"],
["OCR the image", "images/2.jpg"],
["Convert this page to docling", "images/1.png"],
["Convert this page to docling", "images/3.png"],
["Convert chart to OTSL.", "images/4.png"],
["Convert code to text", "images/5.jpg"],
["Convert this table to OTSL.", "images/6.jpg"],
["Convert formula to late.", "images/7.jpg"],
]
video_examples = [
["Explain the video in detail.", "videos/1.mp4"],
["Explain the video in detail.", "videos/2.mp4"]
]
# Create the Gradio Interface
with gr.Blocks(css=css, theme=steel_blue_theme) as demo:
gr.Markdown("# **Multimodal OCR3**", elem_id="main-title")
with gr.Row():
with gr.Column(scale=2):
with gr.Tabs():
with gr.TabItem("Image Inference"):
image_query = gr.Textbox(label="Query Input", placeholder="Enter your query here...")
image_upload = gr.Image(type="pil", label="Upload Image", height=290)
image_submit = gr.Button("Submit", variant="primary")
gr.Examples(examples=image_examples, inputs=[image_query, image_upload])
with gr.TabItem("Video Inference"):
video_query = gr.Textbox(label="Query Input", placeholder="Enter your query here...")
video_upload = gr.Video(label="Upload Video (<= 30s)", height=290)
video_submit = gr.Button("Submit", variant="primary")
gr.Examples(examples=video_examples, inputs=[video_query, video_upload])
with gr.Accordion("Advanced options", open=False):
max_new_tokens = gr.Slider(label="Max new tokens", minimum=1, maximum=MAX_MAX_NEW_TOKENS, step=1, value=DEFAULT_MAX_NEW_TOKENS)
temperature = gr.Slider(label="Temperature", minimum=0.1, maximum=4.0, step=0.1, value=0.6)
top_p = gr.Slider(label="Top-p (nucleus sampling)", minimum=0.05, maximum=1.0, step=0.05, value=0.9)
top_k = gr.Slider(label="Top-k", minimum=1, maximum=1000, step=1, value=50)
repetition_penalty = gr.Slider(label="Repetition penalty", minimum=1.0, maximum=2.0, step=0.05, value=1.2)
with gr.Column(scale=3):
gr.Markdown("## Output", elem_id="output-title")
raw_output = gr.Textbox(label="Raw Output Stream", interactive=False, lines=11, show_copy_button=True)
with gr.Accordion("(Result.md)", open=True):
formatted_output = gr.Markdown(label="(Result.md)")
model_choice = gr.Radio(
choices=["Nanonets-OCR2-3B", "Nanonets-OCR2-1.5B-exp"],
label="Select Model",
value="Nanonets-OCR2-3B"
)
image_submit.click(
fn=generate_image,
inputs=[model_choice, image_query, image_upload, max_new_tokens, temperature, top_p, top_k, repetition_penalty],
outputs=[raw_output, formatted_output]
)
video_submit.click(
fn=generate_video,
inputs=[model_choice, video_query, video_upload, max_new_tokens, temperature, top_p, top_k, repetition_penalty],
outputs=[raw_output, formatted_output]
)
if __name__ == "__main__":
demo.queue(max_size=50).launch(ssr_mode=False, show_error=True)