Spaces:
Running
on
Zero
Running
on
Zero
| import os | |
| import gradio_pdf | |
| import hashlib | |
| import spaces | |
| import re | |
| import time | |
| import click | |
| import gradio as gr | |
| from io import BytesIO | |
| from PIL import Image | |
| from gradio_pdf import PDF | |
| from loguru import logger | |
| import sys # Added for logging configuration | |
| import base64 # Added for image encoding | |
| from bs4 import BeautifulSoup # Added for HTML manipulation | |
| from datetime import datetime | |
| from pathlib import Path | |
| import torch | |
| from transformers import AutoProcessor, Qwen2VLForConditionalGeneration, Qwen2_5_VLForConditionalGeneration | |
| from transformers.image_utils import load_image | |
| import fitz # PyMuPDF library for PDF processing | |
| import html2text | |
| import markdown | |
| import tempfile | |
| # Define supported file suffixes | |
| pdf_suffixes = [".pdf"] | |
| image_suffixes = [".png", ".jpeg", ".jpg"] | |
| # --- Model and Processor Initialization --- | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| MODEL_ID = "Logics-MLLM/Logics-Parsing" | |
| processor = AutoProcessor.from_pretrained(MODEL_ID, trust_remote_code=True) | |
| model = Qwen2_5_VLForConditionalGeneration.from_pretrained( | |
| MODEL_ID, | |
| trust_remote_code=True, | |
| torch_dtype=torch.float16 if device == "cuda" else torch.float32 | |
| ).to(device).eval() | |
| def parse_page(image: Image.Image) -> str: | |
| """ | |
| Parses a single document page image using the Qwen2.5-VL model. | |
| """ | |
| messages = [ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "image", "image": image}, | |
| {"type": "text", "text": "Parse this document page into a clean, structured HTML representation. Preserve the logical structure with appropriate tags for content blocks such as paragraphs (<p>), headings (<h1>-<h6>), tables (<table>), figures (<figure>), formulas (<formula>), and others. Include category tags, and filter out irrelevant elements like headers and footers."}, | |
| ], | |
| }, | |
| ] | |
| prompt_full = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| inputs = processor( | |
| text=[prompt_full], | |
| images=[image], | |
| return_tensors="pt", | |
| padding=True, | |
| ).to(device) | |
| with torch.no_grad(): | |
| generated_ids = model.generate( | |
| **inputs, | |
| max_new_tokens=2048, | |
| temperature=0.1, | |
| top_p=0.9, | |
| do_sample=True, | |
| repetition_penalty=1.05, | |
| ) | |
| generated_ids_trimmed = [ | |
| out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids) | |
| ] | |
| output_text = processor.batch_decode( | |
| generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False | |
| )[0] | |
| return output_text | |
| def images_bytes_to_pdf_bytes(image_bytes: bytes) -> bytes: | |
| """ | |
| Converts image bytes into PDF bytes. | |
| """ | |
| pdf_buffer = BytesIO() | |
| image = Image.open(BytesIO(image_bytes)).convert("RGB") | |
| image.save(pdf_buffer, format="PDF", save_all=True) | |
| pdf_bytes = pdf_buffer.getvalue() | |
| pdf_buffer.close() | |
| return pdf_bytes | |
| def read_fn(path: str or Path) -> bytes: | |
| """ | |
| Reads a file and returns its content in bytes. Converts images to PDF bytes. | |
| """ | |
| if not isinstance(path, Path): | |
| path = Path(path) | |
| with open(str(path), "rb") as input_file: | |
| file_bytes = input_file.read() | |
| if path.suffix in image_suffixes: | |
| return images_bytes_to_pdf_bytes(file_bytes) | |
| elif path.suffix in pdf_suffixes: | |
| return file_bytes | |
| else: | |
| raise Exception(f"Unknown file suffix: {path.suffix}") | |
| def safe_stem(file_path: str) -> str: | |
| """ | |
| Creates a safe file stem from a path. | |
| """ | |
| stem = Path(file_path).stem | |
| return re.sub(r'[^\w.]', '_', stem) | |
| def to_pdf(file_path: str) -> str or None: | |
| """ | |
| Ensures the input file is in PDF format for consistent processing. | |
| If the input is an image, it's converted to a temporary PDF. | |
| """ | |
| if file_path is None: | |
| return None | |
| pdf_bytes = read_fn(file_path) | |
| unique_filename = f'{safe_stem(file_path)}.pdf' | |
| # Use Gradio's temp directory for temporary files | |
| tmp_dir = tempfile.gettempdir() | |
| tmp_file_path = os.path.join(tmp_dir, unique_filename) | |
| with open(tmp_file_path, 'wb') as tmp_pdf_file: | |
| tmp_pdf_file.write(pdf_bytes) | |
| return tmp_file_path | |
| async def pdf_parse(file_path: str, request: gr.Request): | |
| """ | |
| Main parsing function that orchestrates the PDF processing pipeline. | |
| It now extracts images directly and injects them into the final HTML. | |
| """ | |
| if file_path is None: | |
| logger.warning("file_path is None") | |
| return ( | |
| "<p>Please upload a PDF file</p>", "", "<p>No input file</p>", | |
| None, None, "Error: No file provided" | |
| ) | |
| logger.info(f'Processing file: {file_path}') | |
| tmp_pdf_path = to_pdf(file_path) | |
| if tmp_pdf_path is None: | |
| return ( | |
| "<p>Failed to process file</p>", "", "<p>Processing error</p>", | |
| None, None, "Error: Failed to process file" | |
| ) | |
| start_time = time.time() | |
| try: | |
| pdf_document = fitz.open(tmp_pdf_path) | |
| html_parts = [] | |
| # Process each page | |
| for page_num in range(len(pdf_document)): | |
| page = pdf_document.load_page(page_num) | |
| logger.info(f"Processing Page {page_num + 1}/{len(pdf_document)}") | |
| # --- 1. Extract images directly from the PDF page using PyMuPDF --- | |
| page_images_base64 = [] | |
| img_list = page.get_images(full=True) | |
| for img_index, img in enumerate(img_list): | |
| xref = img[0] | |
| base_image = pdf_document.extract_image(xref) | |
| image_bytes = base_image["image"] | |
| image_ext = base_image["ext"] | |
| base64_string = f"data:image/{image_ext};base64,{base64.b64encode(image_bytes).decode()}" | |
| page_images_base64.append(base64_string) | |
| logger.info(f" > Found {len(page_images_base64)} images on page {page_num + 1}.") | |
| # --- 2. Render the page to an image for the VL-Model --- | |
| zoom = 200 / 72.0 # Corresponds to 200 DPI | |
| mat = fitz.Matrix(zoom, zoom) | |
| pix = page.get_pixmap(matrix=mat) | |
| page_image = Image.open(BytesIO(pix.tobytes("png"))) | |
| # --- 3. Get the structured HTML from the model --- | |
| logger.info(f" > Parsing page layout with Logics-Parsing model...") | |
| html_content = parse_page(page_image) | |
| # --- 4. Inject extracted images back into the HTML --- | |
| if page_images_base64: | |
| logger.info(f" > Injecting {len(page_images_base64)} extracted images into generated HTML...") | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| figures = soup.find_all('figure') | |
| # If model identified same number of figures, inject images into them | |
| if len(figures) == len(page_images_base64): | |
| for fig, b64_img in zip(figures, page_images_base64): | |
| img_tag = soup.new_tag('img', src=b64_img, style="max-width:100%; height:auto;") | |
| fig.append(img_tag) | |
| else: # Otherwise, append all images at the end of the page content as a fallback | |
| logger.warning(f" > Mismatch: Model found {len(figures)} figures, but {len(page_images_base64)} images were extracted. Appending images to the end.") | |
| for b64_img in page_images_base64: | |
| img_tag = soup.new_tag('img', src=b64_img, style="max-width:100%; height:auto;") | |
| p_tag = soup.new_tag('p') | |
| p_tag.append(img_tag) | |
| soup.append(p_tag) | |
| html_content = str(soup) | |
| html_parts.append(f'<div class="page-{page_num+1}">{html_content}</div>') | |
| pdf_document.close() | |
| full_html = '\n'.join(html_parts) | |
| parsing_time = time.time() - start_time | |
| # Convert final rich HTML to Markdown | |
| mmd = html2text.html2text(full_html) | |
| mmd_html = markdown.markdown(mmd) | |
| qwen_html = full_html | |
| # Create a temporary markdown file for download | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False, encoding='utf-8') as f: | |
| f.write(mmd) | |
| md_path = f.name | |
| cost_time = f'Parsing time: {parsing_time:.2f}s, Total time: {parsing_time:.2f}s' | |
| return mmd_html, mmd, qwen_html, md_path, tmp_pdf_path, cost_time | |
| except Exception as e: | |
| logger.error(f"Parsing failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return ( | |
| "<p>Parsing failed. Please try again.</p>", "", f"<p>Error: {str(e)}</p>", | |
| None, None, f"Error: {str(e)}" | |
| ) | |
| def main(ctx, **kwargs): | |
| """ | |
| Sets up and launches the Gradio user interface. | |
| """ | |
| # **FIX: Configure Loguru for better visibility in deployment environments** | |
| logger.remove() # Remove default handler | |
| logger.add(sys.stdout, level="INFO") | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# 📄 Logics-Parsing Document Analysis") | |
| gr.Markdown("Upload a PDF or image file to parse its content into structured Markdown and HTML formats, now with improved image extraction.") | |
| with gr.Row(): | |
| with gr.Column(variant='panel', scale=5): | |
| with gr.Row(): | |
| input_file = gr.File(label='Please upload a PDF or image (Max 20 pages for conversion)', | |
| file_types=pdf_suffixes + image_suffixes) | |
| with gr.Row(): | |
| change_bu = gr.Button('Convert', variant='primary') | |
| clear_bu = gr.ClearButton(value='Clear') | |
| pdf_show = PDF(label='PDF Preview', interactive=False, visible=True, height=800) | |
| example_root = 'parsing/examples' | |
| logger.info(f'Looking for examples in: {example_root}') | |
| if os.path.exists(example_root) and os.path.isdir(example_root): | |
| example_files = [os.path.join(example_root, f) for f in os.listdir(example_root) if f.endswith(tuple(pdf_suffixes + image_suffixes))] | |
| if example_files: | |
| with gr.Accordion('Examples:', open=True): | |
| gr.Examples(examples=example_files, inputs=input_file) | |
| with gr.Column(variant='panel', scale=5): | |
| output_file = gr.File(label='Download Markdown Result', interactive=False) | |
| cost_time = gr.Text(label='Time Cost', interactive=False) | |
| with gr.Tabs(): | |
| with gr.Tab('Markdown Rendering'): | |
| mmd_html = gr.HTML(label='MMD Rendering') | |
| with gr.Tab('Markdown Source'): | |
| mmd = gr.TextArea(lines=45, show_copy_button=True, label="Markdown Source") | |
| with gr.Tab('Generated HTML'): | |
| raw_html = gr.TextArea(lines=45, show_copy_button=True, label="Generated HTML") | |
| components_to_clear = [input_file, pdf_show, mmd, raw_html, output_file, mmd_html, cost_time] | |
| clear_bu.add(components_to_clear) | |
| input_file.change(fn=to_pdf, inputs=input_file, outputs=pdf_show, show_progress="full") | |
| change_bu.click( | |
| fn=pdf_parse, | |
| inputs=[input_file], | |
| outputs=[mmd_html, mmd, raw_html, output_file, pdf_show, cost_time], | |
| concurrency_limit=15, | |
| show_progress="full" | |
| ) | |
| demo.launch(debug=True) | |
| if __name__ == '__main__': | |
| main() |