Spaces:
Running on Zero
Running on Zero
| import gradio as gr | |
| import os | |
| import json | |
| import tempfile | |
| import logging | |
| import warnings | |
| from PIL import Image, ImageDraw, ImageFont | |
| import math | |
| import numpy as np | |
| from pathlib import Path | |
| from typing import Optional, Tuple, List, Dict, Any | |
| # Suppress warnings for HuggingFace Spaces | |
| warnings.filterwarnings("ignore", category=FutureWarning) | |
| warnings.filterwarnings("ignore", category=UserWarning) | |
| warnings.filterwarnings("ignore", category=DeprecationWarning) | |
| # Try to import spaces for ZeroGPU support | |
| try: | |
| import spaces | |
| SPACES_AVAILABLE = True | |
| logger_temp = logging.getLogger(__name__) | |
| logger_temp.info("HuggingFace Spaces library available - ZeroGPU support enabled") | |
| except ImportError: | |
| SPACES_AVAILABLE = False | |
| logger_temp = logging.getLogger(__name__) | |
| logger_temp.info("HuggingFace Spaces library not available - running without ZeroGPU") | |
| # No external markdown dependency needed | |
| # Import configuration | |
| from config import ( | |
| MODEL_NAME, LAYOUT_COLORS, | |
| GRADIO_THEME, GRADIO_TITLE, GRADIO_DESCRIPTION, | |
| DEFAULT_ENABLE_ANGLE_CORRECTION, | |
| ERROR_MESSAGES, SUCCESS_MESSAGES, IS_HUGGINGFACE_SPACE, | |
| HUGGINGFACE_TOKEN | |
| ) | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Import youtu parsing modules | |
| try: | |
| from youtu_hf_parser import YoutuOCRParserHF | |
| from youtu_parsing_utils import IMAGE_EXT, PDF_EXT, load_image, load_images_from_pdf | |
| YOUTU_PARSING_AVAILABLE = True | |
| logger.info("Youtu-Parsing modules imported successfully") | |
| except ImportError as e: | |
| logger.warning(f"Failed to import youtu parsing modules: {e}") | |
| logger.warning("Please ensure youtu-parsing is properly installed") | |
| YOUTU_PARSING_AVAILABLE = False | |
| # Global variables | |
| # Note: For ZeroGPU, we should NOT load model in main process | |
| # Model will be loaded lazily inside @spaces.GPU decorated function | |
| parser = None | |
| model_loaded = False | |
| def _load_model_internal() -> Optional[YoutuOCRParserHF]: | |
| """Load the Youtu-Parsing model from HuggingFace""" | |
| global parser, model_loaded | |
| if model_loaded and parser is not None: | |
| logger.info("Model already loaded, returning cached parser") | |
| return parser | |
| if not YOUTU_PARSING_AVAILABLE: | |
| logger.error("Youtu-Parsing modules not available") | |
| logger.error("Please ensure youtu-parsing is properly installed:") | |
| logger.error(" pip install git+https://github.com/TencentCloudADP/youtu-parsing.git#subdirectory=youtu_hf_parser") | |
| return None | |
| try: | |
| logger.info("=" * 60) | |
| logger.info(f"Starting model loading: {MODEL_NAME}") | |
| logger.info(f"Is HuggingFace Space: {IS_HUGGINGFACE_SPACE}") | |
| # IMPORTANT: Do NOT call torch.cuda methods in main process for ZeroGPU! | |
| # ZeroGPU will automatically handle device placement inside @spaces.GPU context | |
| logger.info("Loading model (device placement handled by ZeroGPU)") | |
| # Prepare model loading parameters | |
| model_kwargs = { | |
| "model_path": MODEL_NAME, | |
| "enable_angle_correct": True, | |
| } | |
| # Add HuggingFace token if available (for private/gated models) | |
| if IS_HUGGINGFACE_SPACE: | |
| if HUGGINGFACE_TOKEN: | |
| logger.info("Using HuggingFace token for authentication") | |
| model_kwargs["token"] = HUGGINGFACE_TOKEN | |
| else: | |
| logger.warning("HF_TOKEN not found in environment variables") | |
| logger.warning("If the model is private or gated, please set HF_TOKEN in Space settings") | |
| logger.info("Initializing YoutuOCRParserHF...") | |
| logger.info(f"Model kwargs: {model_kwargs}") | |
| # Load the parser | |
| # In ZeroGPU: loads on CPU, moves to GPU inside @spaces.GPU decorated function | |
| parser = YoutuOCRParserHF(**model_kwargs) | |
| model_loaded = True | |
| logger.info("=" * 60) | |
| logger.info("โ " + SUCCESS_MESSAGES["model_loaded"]) | |
| logger.info("=" * 60) | |
| return parser | |
| except ImportError as e: | |
| logger.error("=" * 60) | |
| logger.error(f"โ Import error: {str(e)}") | |
| logger.error("Missing dependencies. Please ensure all required packages are installed:") | |
| logger.error(" - torch>=2.0.0") | |
| logger.error(" - transformers>=4.30.0") | |
| logger.error(" - accelerate>=0.20.0") | |
| logger.error(" - pillow>=8.0.0") | |
| logger.error(" - numpy>=1.20.0") | |
| logger.error("=" * 60) | |
| return None | |
| except MemoryError as e: | |
| logger.error("=" * 60) | |
| logger.error(f"โ Memory error: {str(e)}") | |
| logger.error("Insufficient memory to load the model") | |
| logger.error("Solutions:") | |
| logger.error(" 1. Upgrade to a Space with more RAM") | |
| logger.error(" 2. Use ZeroGPU hardware tier") | |
| logger.error(" 3. Contact HuggingFace support for assistance") | |
| logger.error("=" * 60) | |
| return None | |
| except OSError as e: | |
| logger.error("=" * 60) | |
| logger.error(f"โ OS/File error: {str(e)}") | |
| logger.error("This might be a model download issue or disk space problem") | |
| logger.error("Possible causes:") | |
| logger.error(" - Network timeout during model download") | |
| logger.error(" - Insufficient disk space") | |
| logger.error(" - Permission issues") | |
| logger.error(" - Model repository not accessible") | |
| logger.error("=" * 60) | |
| return None | |
| except Exception as e: | |
| logger.error("=" * 60) | |
| logger.error(f"โ Unexpected error loading model: {str(e)}") | |
| logger.error(f"Error type: {type(e).__name__}") | |
| import traceback | |
| logger.error("Full traceback:") | |
| logger.error("-" * 60) | |
| logger.error(traceback.format_exc()) | |
| logger.error("=" * 60) | |
| return None | |
| def draw_layout_boxes(image: Image.Image, bboxes: List[Dict]) -> Image.Image: | |
| """Draw layout bounding boxes on the image""" | |
| if not bboxes: | |
| return image | |
| # Create image copy | |
| draw_image = image.copy() | |
| if draw_image.mode != "RGBA": | |
| draw_image = draw_image.convert("RGBA") | |
| overlay = Image.new("RGBA", image.size, (0,0,0,0)) | |
| draw = ImageDraw.Draw(overlay) | |
| # Load font | |
| try: | |
| font = ImageFont.load_default() | |
| except Exception: | |
| font = ImageFont.load_default() | |
| for i, cell in enumerate(bboxes): | |
| bbox = cell.get('bbox', []) | |
| if len(bbox) < 8: | |
| continue | |
| # Convert bbox to points: [x0, y0, x1, y1, x2, y2, x3, y3] | |
| pts = [(bbox[j], bbox[j+1]) for j in range(0, 8, 2)] | |
| layout_type = cell.get('type', '').replace('<LAYOUT_', '').replace('>', '') or 'Unknown' | |
| color = LAYOUT_COLORS.get(layout_type, LAYOUT_COLORS['Unknown']) | |
| # Fill rectangle | |
| fill_color = tuple(color[:3]) + (100,) | |
| outline_color = tuple(color[:3]) + (255,) | |
| try: | |
| draw.polygon(pts, outline=outline_color, fill=fill_color) | |
| # Draw text label | |
| order_cate = f"{i}_{layout_type}" | |
| text_color = tuple(color[:3]) + (255,) | |
| # Calculate text position | |
| x_anchor, y_anchor = pts[0] | |
| # Draw text | |
| draw.text((x_anchor, y_anchor), order_cate, font=font, fill=text_color) | |
| except Exception as e: | |
| logger.warning(f"Error drawing bbox {i}: {e}") | |
| continue | |
| # Composite to original image | |
| try: | |
| result = Image.alpha_composite(draw_image, overlay) | |
| return result.convert("RGB") | |
| except Exception as e: | |
| logger.error(f"Error compositing image: {e}") | |
| return image | |
| # Decorator for GPU acceleration if available | |
| if SPACES_AVAILABLE: | |
| def parse_document(image: Optional[Image.Image], | |
| enable_angle_corrector: bool) -> Tuple[Optional[Image.Image], str, str, str, str]: | |
| """Parse the uploaded document (with ZeroGPU support) | |
| Returns: | |
| Tuple of (output_image, markdown_rendered, markdown_source, json_output, status_msg) | |
| """ | |
| return _parse_document_internal(image, enable_angle_corrector) | |
| else: | |
| def parse_document(image: Optional[Image.Image], | |
| enable_angle_corrector: bool) -> Tuple[Optional[Image.Image], str, str, str, str]: | |
| """Parse the uploaded document (without ZeroGPU) | |
| Returns: | |
| Tuple of (output_image, markdown_rendered, markdown_source, json_output, status_msg) | |
| """ | |
| return _parse_document_internal(image, enable_angle_corrector) | |
| def _parse_document_internal(image: Optional[Image.Image], | |
| enable_angle_corrector: bool) -> Tuple[Optional[Image.Image], str, str, str, str]: | |
| """Internal parse function | |
| This function is called inside @spaces.GPU context (if available) | |
| So it's safe to load model here - CUDA will be initialized properly by ZeroGPU | |
| Returns: | |
| Tuple of (output_image, markdown_rendered, markdown_source, json_output, status_msg) | |
| """ | |
| global parser | |
| if image is None: | |
| return None, "<p>Please upload an image first</p>", "", "", ERROR_MESSAGES["no_image"] | |
| if not YOUTU_PARSING_AVAILABLE: | |
| return None, "<p>Youtu-Parsing module is not available, please check installation</p>", "", "", "Youtu-Parsing modules are not available. Please check the installation." | |
| # Load model if not already loaded | |
| # In ZeroGPU environment, this is called inside @spaces.GPU decorated function | |
| # so CUDA initialization is safe here | |
| if parser is None: | |
| parser = _load_model_internal() | |
| if parser is None: | |
| return None, "<p>Model loading failed</p>", "", "", ERROR_MESSAGES["model_load_failed"] | |
| try: | |
| logger.info(f"Parsing document (enable_angle_corrector={enable_angle_corrector})") | |
| # ็ดๆฅไฝฟ็จ _parse_single_image ๅฝๆฐๅค็ PIL Image๏ผๆ ้ไฟๅญไธดๆถๆไปถ | |
| # ไผ ๅ ฅ enable_angle_corrector ๅ batch_size ๅๆฐ | |
| page_result, page_angle, hierarchy_json = parser._parse_single_image( | |
| image, | |
| enable_angle_corrector=enable_angle_corrector | |
| ) | |
| if page_result and len(page_result) > 0: | |
| # Extract layout bboxes for visualization | |
| layout_bboxes = [] | |
| for item in page_result: | |
| if 'bbox' in item: | |
| layout_bboxes.append({ | |
| 'bbox': item['bbox'], | |
| 'type': item.get('type', ''), | |
| 'content': item.get('content', '') | |
| }) | |
| # Draw layout boxes on image | |
| image_with_boxes = draw_layout_boxes(image, layout_bboxes) | |
| # Create markdown content (exclude Figure type items) | |
| markdown_content = "\n\n".join([ | |
| item.get('content', '') for item in page_result | |
| if item.get('content') and item.get('type') != 'Figure' | |
| ]) | |
| # Create JSON content (include hierarchy info) | |
| json_output = { | |
| "page_result": page_result, | |
| "page_angle": page_angle, | |
| "hierarchy": hierarchy_json | |
| } | |
| json_content = json.dumps(json_output, ensure_ascii=False, indent=2) | |
| # ็ดๆฅ่ฟๅ markdown ๅ ๅฎน็ป gr.Markdown ็ปไปถๆธฒๆ | |
| logger.info(f"Generated markdown content (first 200 chars): {markdown_content[:200] if markdown_content else 'empty'}") | |
| logger.info("Document parsing completed successfully") | |
| return image_with_boxes, markdown_content, markdown_content, json_content, SUCCESS_MESSAGES["parsing_complete"] | |
| else: | |
| return None, "No parsing results", "", "", ERROR_MESSAGES["no_results"] | |
| except Exception as e: | |
| logger.error(f"Error during parsing: {str(e)}") | |
| return None, f"Parsing error: {str(e)}", "", "", ERROR_MESSAGES["parsing_failed"].format(str(e)) | |
| def create_interface(): | |
| """Create the Gradio interface - simplified layout for HuggingFace Space compatibility""" | |
| # ่ชๅฎไน CSS ๅญไฝๆ ทๅผ | |
| custom_css = """ | |
| @import url('https://fonts.googleapis.com/css2?family=Inter:wght@400;500;600;700&family=Noto+Sans+SC:wght@400;500;700&display=swap'); | |
| * { | |
| font-family: 'Inter', 'Noto Sans SC', -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, 'Helvetica Neue', Arial, sans-serif !important; | |
| } | |
| .markdown-text { | |
| font-family: 'Inter', 'Noto Sans SC', sans-serif !important; | |
| line-height: 1.7 !important; | |
| } | |
| h1, h2, h3, h4, h5, h6 { | |
| font-weight: 600 !important; | |
| } | |
| code, pre { | |
| font-family: 'JetBrains Mono', 'Fira Code', 'SF Mono', Consolas, monospace !important; | |
| } | |
| textarea, input { | |
| font-family: 'Inter', 'Noto Sans SC', sans-serif !important; | |
| } | |
| """ | |
| with gr.Blocks(title=GRADIO_TITLE, css=custom_css) as demo: | |
| gr.Markdown(f"# ๐ {GRADIO_TITLE}") | |
| gr.Markdown(f"{GRADIO_DESCRIPTION}") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| input_image = gr.Image( | |
| type="pil", | |
| label="Upload Document Image", | |
| height=300, | |
| sources=["upload", "clipboard"] | |
| ) | |
| with gr.Accordion("โ๏ธ Advanced Options", open=False): | |
| enable_angle_corrector = gr.Checkbox( | |
| label="Enable Angle Correction", | |
| value=DEFAULT_ENABLE_ANGLE_CORRECTION, | |
| info="Automatically correct document orientation" | |
| ) | |
| parse_btn = gr.Button("๐ Start Parsing", variant="primary", size="lg") | |
| status_msg = gr.Textbox(label="Status", interactive=False, lines=2) | |
| with gr.Column(scale=2): | |
| with gr.Tabs(): | |
| with gr.Tab("Visualization"): | |
| output_image = gr.Image(label="Layout Detection Result", height=500) | |
| with gr.Tab("Markdown Rendered"): | |
| markdown_rendered = gr.Markdown( | |
| value="Upload a document and the parsing results will appear here...", | |
| latex_delimiters=[ | |
| {"left": "$$", "right": "$$", "display": True}, | |
| {"left": "$", "right": "$", "display": False}, | |
| {"left": "\\[", "right": "\\]", "display": True}, | |
| {"left": "\\(", "right": "\\)", "display": False}, | |
| ] | |
| ) | |
| with gr.Tab("Markdown Source"): | |
| markdown_source = gr.Textbox(label="Markdown Source Code", lines=20) | |
| with gr.Tab("JSON Output"): | |
| json_output = gr.Textbox(label="Structured Data", lines=20) | |
| # Event handler | |
| parse_btn.click( | |
| fn=parse_document, | |
| inputs=[input_image, enable_angle_corrector], | |
| outputs=[output_image, markdown_rendered, markdown_source, json_output, status_msg] | |
| ) | |
| with gr.Accordion("โน๏ธ Instructions", open=False): | |
| gr.Markdown(""" | |
| ### Supported Document Types | |
| - **Text Documents** - Documents containing text and tables | |
| - **Charts & Graphics** - Various charts and diagrams | |
| - **Math Formulas** - Mathematical expressions in LaTeX format | |
| ### How to Use | |
| 1. Upload a document image (supports JPG, PNG, etc.) | |
| 2. Click the "Start Parsing" button | |
| 3. View the results (Visualization, Markdown, JSON) | |
| """) | |
| return demo | |
| def main(): | |
| """Main function to preload model and launch the interface | |
| 1. Load model first (predownload weights) | |
| 2. Then create and launch interface | |
| """ | |
| global parser, model_loaded | |
| # Preload model before launching interface | |
| # This ensures model weights are downloaded during startup | |
| logger.info("=" * 60) | |
| logger.info("๐ Starting Youtu-Parsing Application") | |
| logger.info("=" * 60) | |
| logger.info(f"Environment: {'HuggingFace Space' if IS_HUGGINGFACE_SPACE else 'Local'}") | |
| logger.info("Preloading model before interface launch...") | |
| # Always preload model to ensure weights are downloaded at startup | |
| # This prevents download delay on first request | |
| try: | |
| parser = _load_model_internal() | |
| if parser is not None: | |
| logger.info("โ Model preloaded successfully") | |
| model_loaded = True | |
| else: | |
| logger.warning("โ ๏ธ Model preload failed, will retry on first inference") | |
| except Exception as e: | |
| logger.error(f"โ Error preloading model: {e}") | |
| import traceback | |
| logger.error(traceback.format_exc()) | |
| logger.warning("โ ๏ธ Will attempt to load model on first inference") | |
| # Create and launch the interface | |
| logger.info("Creating Gradio interface...") | |
| demo = create_interface() | |
| logger.info("Launching Gradio interface...") | |
| # Launch with theme for better compatibility | |
| demo.queue(max_size=20).launch( | |
| share=False, | |
| inbrowser=False | |
| ) | |
| if __name__ == "__main__": | |
| main() |