#!/usr/bin/env python3 """ MonkeyOCR 3B Gradio App for MacBook M4 Pro with MPS Acceleration Optimized for local deployment with Apple Silicon GPU acceleration """ import os import sys import tempfile import shutil from pathlib import Path import base64 import re import uuid import subprocess from typing import Optional, Tuple import gradio as gr import torch from PIL import Image from pdf2image import convert_from_path from loguru import logger # Apply PyTorch patch for doclayout_yolo compatibility from torch_patch import patch_torch_load patch_torch_load() # Add MonkeyOCR to path sys.path.append("./MonkeyOCR") try: from magic_pdf.data.data_reader_writer import FileBasedDataWriter, FileBasedDataReader from magic_pdf.data.dataset import PymuDocDataset, ImageDataset from magic_pdf.model.doc_analyze_by_custom_model_llm import doc_analyze_llm from magic_pdf.model.custom_model import MonkeyOCR except ImportError as e: logger.error(f"Failed to import MonkeyOCR modules: {e}") logger.info("Please ensure MonkeyOCR is properly installed") sys.exit(1) # Global model instance model_instance = None def initialize_model(config_path: str = "model_configs_mps.yaml") -> MonkeyOCR: """Initialize MonkeyOCR model with MPS optimization""" global model_instance if model_instance is None: logger.info("Initializing MonkeyOCR model with MPS acceleration...") # Check if MPS is available if not torch.backends.mps.is_available(): logger.warning("MPS not available, falling back to CPU") # Modify config to use CPU import yaml with open(config_path, 'r') as f: config = yaml.safe_load(f) config['device'] = 'cpu' with open(config_path, 'w') as f: yaml.dump(config, f) else: logger.info("MPS is available and will be used for acceleration") # Set environment variables for optimal MPS performance os.environ['PYTORCH_MPS_HIGH_WATERMARK_RATIO'] = '0.0' try: model_instance = MonkeyOCR(config_path) logger.info("Model initialized successfully") except Exception as e: logger.error(f"Failed to initialize model: {e}") raise return model_instance def render_latex_table_to_image(latex_content: str, temp_dir: str) -> str: """Render LaTeX table to image and return HTML img tag""" try: # Extract tabular environment content pattern = r"(\\begin\{tabular\}.*?\\end\{tabular\})" matches = re.findall(pattern, latex_content, re.DOTALL) if matches: table_content = matches[0] elif '\\begin{tabular}' in latex_content: if '\\end{tabular}' not in latex_content: table_content = latex_content + '\n\\end{tabular}' else: table_content = latex_content else: return latex_content # Build complete LaTeX document full_latex = r""" \documentclass{article} \usepackage[utf8]{inputenc} \usepackage{booktabs} \usepackage{bm} \usepackage{multirow} \usepackage{array} \usepackage{colortbl} \usepackage[table]{xcolor} \usepackage{amsmath} \usepackage{amssymb} \usepackage{graphicx} \usepackage{geometry} \usepackage{makecell} \usepackage[active,tightpage]{preview} \PreviewEnvironment{tabular} \begin{document} """ + table_content + r""" \end{document} """ # Generate unique filename unique_id = str(uuid.uuid4())[:8] tex_path = os.path.join(temp_dir, f"table_{unique_id}.tex") pdf_path = os.path.join(temp_dir, f"table_{unique_id}.pdf") png_path = os.path.join(temp_dir, f"table_{unique_id}.png") # Write tex file with open(tex_path, "w", encoding="utf-8") as f: f.write(full_latex) # Compile LaTeX to PDF result = subprocess.run( ["pdflatex", "-interaction=nonstopmode", "-output-directory", temp_dir, tex_path], timeout=20, capture_output=True, text=True ) if result.returncode != 0 or not os.path.exists(pdf_path): logger.warning("LaTeX compilation failed, returning original content") return f"
{latex_content}"
# Convert PDF to PNG
images = convert_from_path(pdf_path, dpi=300)
images[0].save(png_path, "PNG")
# Convert to base64
with open(png_path, "rb") as f:
img_data = f.read()
img_base64 = base64.b64encode(img_data).decode("utf-8")
# Clean up temporary files
for file_path in [tex_path, pdf_path, png_path]:
if os.path.exists(file_path):
os.remove(file_path)
return f'{latex_content}"
def process_document(file_path: str) -> Tuple[str, str]:
"""Process document and return markdown content and layout PDF path"""
if not file_path:
return "", ""
try:
model = initialize_model()
parent_path = os.path.dirname(file_path)
full_name = os.path.basename(file_path)
name = '.'.join(full_name.split(".")[:-1])
# Create output directories
local_image_dir = os.path.join(parent_path, "markdown", "images")
local_md_dir = os.path.join(parent_path, "markdown")
os.makedirs(local_image_dir, exist_ok=True)
os.makedirs(local_md_dir, exist_ok=True)
image_dir = os.path.basename(local_image_dir)
image_writer = FileBasedDataWriter(local_image_dir)
md_writer = FileBasedDataWriter(local_md_dir)
reader = FileBasedDataReader(parent_path)
# Read file data
data_bytes = reader.read(full_name)
# Create dataset based on file type
if full_name.split(".")[-1].lower() in ['jpg', 'jpeg', 'png']:
ds = ImageDataset(data_bytes)
else:
ds = PymuDocDataset(data_bytes)
# Process document with threading-based timeout
logger.info("Processing document with MonkeyOCR...")
import threading
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FutureTimeoutError
def process_with_model():
overall_start_time = time.time()
# Step 1: Document Analysis
analysis_start_time = time.time()
logger.info("Starting document analysis...")
infer_result = ds.apply(doc_analyze_llm, MonkeyOCR_model=model)
logger.info(f"PROFILE: Document analysis (doc_analyze_llm) took {time.time() - analysis_start_time:.2f}s")
# Step 2: OCR and Layout Processing
ocr_start_time = time.time()
logger.info("Starting OCR and layout processing...")
pipe_result = infer_result.pipe_ocr_mode(image_writer, MonkeyOCR_model=model)
logger.info(f"PROFILE: OCR/Layout (pipe_ocr_mode) took {time.time() - ocr_start_time:.2f}s")
logger.info(f"PROFILE: Total model processing took {time.time() - overall_start_time:.2f}s")
return infer_result, pipe_result
# Use ThreadPoolExecutor with timeout
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(process_with_model)
try:
infer_result, pipe_result = future.result(timeout=300) # 5 minute timeout
except FutureTimeoutError:
logger.error("Processing timed out after 5 minutes")
raise TimeoutError("Document processing timed out. Please try with a smaller document or simpler layout.")
# Generate layout PDF
layout_pdf_path = os.path.join(parent_path, f"{name}_layout.pdf")
pipe_result.draw_layout(layout_pdf_path)
# Generate markdown
pipe_result.dump_md(md_writer, f"{name}.md", image_dir)
md_content_ori = FileBasedDataReader(local_md_dir).read(f"{name}.md").decode("utf-8")
# Process markdown content (render LaTeX tables and convert images to base64)
temp_dir = tempfile.mkdtemp()
try:
# Process HTML-wrapped LaTeX tables
def replace_html_latex_table(match):
html_content = match.group(1)
if '\\begin{tabular}' in html_content:
return render_latex_table_to_image(html_content, temp_dir)
else:
return match.group(0)
md_content = re.sub(r'(.*?)', replace_html_latex_table, md_content_ori, flags=re.DOTALL)
# Convert local image links to base64
def replace_image_with_base64(match):
img_path = match.group(1)
if not os.path.isabs(img_path):
full_img_path = os.path.join(local_md_dir, img_path)
else:
full_img_path = img_path
try:
if os.path.exists(full_img_path):
with open(full_img_path, "rb") as f:
img_data = f.read()
img_base64 = base64.b64encode(img_data).decode("utf-8")
ext = os.path.splitext(full_img_path)[1].lower()
mime_type = "image/jpeg" if ext in ['.jpg', '.jpeg'] else f"image/{ext[1:]}"
return f'