Spaces:
Running
Running
| import os | |
| import streamlit as st | |
| import json | |
| import sys | |
| import time | |
| from pathlib import Path | |
| import tempfile | |
| import io | |
| from pdf2image import convert_from_bytes | |
| from PIL import Image, ImageEnhance, ImageFilter | |
| import cv2 | |
| import numpy as np | |
| # Import the StructuredOCR class and config from the local files | |
| from structured_ocr import StructuredOCR | |
| from config import MISTRAL_API_KEY | |
| # Set page configuration | |
| st.set_page_config( | |
| page_title="Historical OCR", | |
| page_icon="🚀", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| # Enable caching for expensive operations | |
| def convert_pdf_to_images(pdf_bytes, dpi=150): | |
| """Convert PDF bytes to a list of images with caching""" | |
| try: | |
| return convert_from_bytes(pdf_bytes, dpi=dpi) | |
| except Exception as e: | |
| st.error(f"Error converting PDF: {str(e)}") | |
| return [] | |
| def preprocess_image(image_bytes, preprocessing_options): | |
| """Preprocess image with selected options""" | |
| # Convert bytes to OpenCV format | |
| image = Image.open(io.BytesIO(image_bytes)) | |
| img_array = np.array(image) | |
| # Apply preprocessing based on selected options | |
| if preprocessing_options.get("grayscale", False): | |
| img_array = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) | |
| img_array = cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB) | |
| if preprocessing_options.get("contrast", 0) != 0: | |
| contrast_factor = 1 + (preprocessing_options.get("contrast", 0) / 10) | |
| image = Image.fromarray(img_array) | |
| enhancer = ImageEnhance.Contrast(image) | |
| image = enhancer.enhance(contrast_factor) | |
| img_array = np.array(image) | |
| if preprocessing_options.get("denoise", False): | |
| img_array = cv2.fastNlMeansDenoisingColored(img_array, None, 10, 10, 7, 21) | |
| if preprocessing_options.get("threshold", False): | |
| # Convert to grayscale if not already | |
| if len(img_array.shape) == 3: | |
| gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY) | |
| else: | |
| gray = img_array | |
| # Apply adaptive threshold | |
| binary = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
| cv2.THRESH_BINARY, 11, 2) | |
| # Convert back to RGB | |
| img_array = cv2.cvtColor(binary, cv2.COLOR_GRAY2RGB) | |
| # Convert back to PIL Image | |
| processed_image = Image.fromarray(img_array) | |
| # Convert to bytes | |
| byte_io = io.BytesIO() | |
| processed_image.save(byte_io, format='PNG') | |
| byte_io.seek(0) | |
| return byte_io.getvalue() | |
| # Define functions | |
| def process_file(uploaded_file, use_vision=True, preprocessing_options=None): | |
| """Process the uploaded file and return the OCR results | |
| Args: | |
| uploaded_file: The uploaded file to process | |
| use_vision: Whether to use vision model | |
| preprocessing_options: Dictionary of preprocessing options | |
| """ | |
| if preprocessing_options is None: | |
| preprocessing_options = {} | |
| # Show progress indicator | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| status_text.text("Preparing file for processing...") | |
| # Save the uploaded file to a temporary file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=Path(uploaded_file.name).suffix) as tmp: | |
| tmp.write(uploaded_file.getvalue()) | |
| temp_path = tmp.name | |
| try: | |
| # Check if API key is available | |
| if not MISTRAL_API_KEY: | |
| # Return dummy data if no API key | |
| progress_bar.progress(100) | |
| status_text.empty() | |
| return { | |
| "file_name": uploaded_file.name, | |
| "topics": ["Sample Document"], | |
| "languages": ["English"], | |
| "ocr_contents": { | |
| "title": "Sample Document", | |
| "content": "This is sample content. To process real documents, please set the MISTRAL_API_KEY environment variable." | |
| } | |
| } | |
| # Update progress | |
| progress_bar.progress(20) | |
| status_text.text("Initializing OCR processor...") | |
| # Initialize OCR processor | |
| processor = StructuredOCR() | |
| # Determine file type from extension | |
| file_ext = Path(uploaded_file.name).suffix.lower() | |
| file_type = "pdf" if file_ext == ".pdf" else "image" | |
| # Apply preprocessing if needed | |
| if any(preprocessing_options.values()) and file_type == "image": | |
| status_text.text("Applying image preprocessing...") | |
| processed_bytes = preprocess_image(uploaded_file.getvalue(), preprocessing_options) | |
| # Save processed image to temp file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=Path(uploaded_file.name).suffix) as proc_tmp: | |
| proc_tmp.write(processed_bytes) | |
| temp_path = proc_tmp.name | |
| # Get file size in MB | |
| file_size_mb = os.path.getsize(temp_path) / (1024 * 1024) | |
| # Check if file exceeds API limits (50 MB) | |
| if file_size_mb > 50: | |
| st.error(f"File too large ({file_size_mb:.1f} MB). Maximum file size allowed by Mistral API is 50MB.") | |
| return { | |
| "file_name": uploaded_file.name, | |
| "topics": ["Document"], | |
| "languages": ["English"], | |
| "confidence_score": 0.0, | |
| "error": f"File size {file_size_mb:.2f} MB exceeds Mistral API limit of 50 MB", | |
| "ocr_contents": { | |
| "error": f"Failed to process file: File size {file_size_mb:.2f} MB exceeds Mistral API limit of 50 MB", | |
| "partial_text": "Document could not be processed due to size limitations." | |
| } | |
| } | |
| # Update progress | |
| progress_bar.progress(40) | |
| status_text.text("Processing document with OCR...") | |
| # Process the file with file size information for automatic page limiting | |
| # Make sure we're using the latest mistral-ocr model | |
| # See https://docs.mistral.ai/capabilities/document/ for more info | |
| result = processor.process_file(temp_path, file_type=file_type, use_vision=use_vision, file_size_mb=file_size_mb) | |
| # Complete progress | |
| progress_bar.progress(100) | |
| status_text.empty() | |
| return result | |
| except Exception as e: | |
| progress_bar.progress(100) | |
| status_text.empty() | |
| st.error(f"Error during processing: {str(e)}") | |
| raise | |
| finally: | |
| # Clean up the temporary file | |
| if os.path.exists(temp_path): | |
| os.unlink(temp_path) | |
| # App title and description | |
| st.title("Historical Document OCR") | |
| st.subheader("Powered by Mistral AI") | |
| # Create main layout with tabs and columns | |
| main_tab1, main_tab2 = st.tabs(["Document Processing", "About"]) | |
| with main_tab1: | |
| # Create a two-column layout for file upload and preview | |
| upload_col, preview_col = st.columns([1, 1]) | |
| # File uploader in the left column | |
| with upload_col: | |
| st.markdown(""" | |
| Upload an image or PDF file to get started. | |
| Using the latest `mistral-ocr-latest` model for advanced document understanding. | |
| """) | |
| uploaded_file = st.file_uploader("Choose a file", type=["pdf", "png", "jpg", "jpeg"]) | |
| # Sidebar with options | |
| with st.sidebar: | |
| st.header("Options") | |
| # Model options | |
| st.subheader("Model Settings") | |
| use_vision = st.checkbox("Use Vision Model", value=True, | |
| help="For image files, use the vision model for improved analysis (may be slower)") | |
| # Image preprocessing options (collapsible) | |
| st.subheader("Image Preprocessing") | |
| with st.expander("Preprocessing Options"): | |
| preprocessing_options = {} | |
| preprocessing_options["grayscale"] = st.checkbox("Convert to Grayscale", | |
| help="Convert image to grayscale before OCR") | |
| preprocessing_options["threshold"] = st.checkbox("Apply Thresholding", | |
| help="Apply adaptive thresholding to enhance text") | |
| preprocessing_options["denoise"] = st.checkbox("Denoise Image", | |
| help="Remove noise from the image") | |
| preprocessing_options["contrast"] = st.slider("Adjust Contrast", -5, 5, 0, | |
| help="Adjust image contrast (-5 to +5)") | |
| # PDF options (collapsible) | |
| st.subheader("PDF Options") | |
| with st.expander("PDF Settings"): | |
| pdf_dpi = st.slider("PDF Resolution (DPI)", 72, 300, 150, | |
| help="Higher DPI gives better quality but slower processing") | |
| max_pages = st.number_input("Maximum Pages to Process", 1, 20, 5, | |
| help="Limit number of pages to process") | |
| # About tab content | |
| with main_tab2: | |
| st.markdown(""" | |
| ### About This Application | |
| This app uses [Mistral AI's Document OCR](https://docs.mistral.ai/capabilities/document/) to extract text and images from historical documents. | |
| It can process: | |
| - Image files (jpg, png, etc.) | |
| - PDF documents (multi-page support) | |
| The extracted content is processed into structured data based on the document type, combining: | |
| - Text extraction with `mistral-ocr-latest` | |
| - Analysis with language models | |
| - Layout preservation with images | |
| View results in three formats: | |
| - Structured HTML view | |
| - Raw JSON (for developers) | |
| - Markdown with images (preserves document layout) | |
| **New Features:** | |
| - Image preprocessing for better OCR quality | |
| - PDF resolution and page controls | |
| - Progress tracking during processing | |
| """) | |
| with main_tab1: | |
| if uploaded_file is not None: | |
| # Check file size (cap at 50MB) | |
| file_size_mb = len(uploaded_file.getvalue()) / (1024 * 1024) | |
| if file_size_mb > 50: | |
| with upload_col: | |
| st.error(f"File too large ({file_size_mb:.1f} MB). Maximum file size is 50MB.") | |
| st.stop() | |
| file_ext = Path(uploaded_file.name).suffix.lower() | |
| # Display document preview in preview column | |
| with preview_col: | |
| st.subheader("Document Preview") | |
| if file_ext == ".pdf": | |
| try: | |
| # Convert first page of PDF to image for preview | |
| pdf_bytes = uploaded_file.getvalue() | |
| images = convert_from_bytes(pdf_bytes, first_page=1, last_page=1, dpi=150) | |
| if images: | |
| # Convert PIL image to bytes for Streamlit | |
| first_page = images[0] | |
| img_bytes = io.BytesIO() | |
| first_page.save(img_bytes, format='JPEG') | |
| img_bytes.seek(0) | |
| # Display the PDF preview | |
| st.image(img_bytes, caption=f"PDF Preview: {uploaded_file.name}", use_container_width=True) | |
| else: | |
| st.info(f"PDF uploaded: {uploaded_file.name}") | |
| except Exception: | |
| # Simply show the file name without an error message | |
| st.info(f"PDF uploaded: {uploaded_file.name}") | |
| st.info("Click 'Process Document' to analyze the content.") | |
| else: | |
| st.image(uploaded_file, use_container_width=True) | |
| # Add image preprocessing preview in a collapsible section if needed | |
| if any(preprocessing_options.values()) and uploaded_file.type.startswith('image/'): | |
| with st.expander("Image Preprocessing Preview"): | |
| preview_cols = st.columns(2) | |
| with preview_cols[0]: | |
| st.markdown("**Original Image**") | |
| st.image(uploaded_file, use_container_width=True) | |
| with preview_cols[1]: | |
| st.markdown("**Preprocessed Image**") | |
| try: | |
| processed_bytes = preprocess_image(uploaded_file.getvalue(), preprocessing_options) | |
| st.image(io.BytesIO(processed_bytes), use_container_width=True) | |
| except Exception as e: | |
| st.error(f"Error in preprocessing: {str(e)}") | |
| # Process button - flush left with similar padding as file browser | |
| with upload_col: | |
| process_button = st.button("Process Document", use_container_width=True) | |
| # Results section | |
| if process_button: | |
| try: | |
| # Get max_pages or default if not available | |
| max_pages_value = max_pages if 'max_pages' in locals() else None | |
| # Call process_file with all options | |
| result = process_file(uploaded_file, use_vision, preprocessing_options) | |
| # Create results tabs for better organization | |
| results_tab1, results_tab2 = st.tabs(["Document Analysis", "Technical Details"]) | |
| with results_tab1: | |
| # Create two columns for metadata and content | |
| meta_col, content_col = st.columns([1, 2]) | |
| with meta_col: | |
| st.subheader("Document Metadata") | |
| st.success("**Document processed successfully**") | |
| # Display file info | |
| st.write(f"**File Name:** {result.get('file_name', uploaded_file.name)}") | |
| # Display info if only limited pages were processed | |
| if 'limited_pages' in result: | |
| st.info(f"Processed {result['limited_pages']['processed']} of {result['limited_pages']['total']} pages") | |
| # Display languages if available | |
| if 'languages' in result: | |
| languages = [lang for lang in result['languages'] if lang is not None] | |
| if languages: | |
| st.write(f"**Languages:** {', '.join(languages)}") | |
| # Confidence score if available | |
| if 'confidence_score' in result: | |
| confidence = result['confidence_score'] | |
| st.write(f"**OCR Confidence:** {confidence:.1%}") | |
| # Display topics if available | |
| if 'topics' in result and result['topics']: | |
| st.write(f"**Topics:** {', '.join(result['topics'])}") | |
| with content_col: | |
| st.subheader("Document Contents") | |
| if 'ocr_contents' in result: | |
| # Check if there are images in the OCR result | |
| has_images = False | |
| if 'raw_response' in result: | |
| try: | |
| has_images = any(page.images for page in result['raw_response'].pages) | |
| except Exception: | |
| has_images = False | |
| # Create tabs for different views | |
| if has_images: | |
| view_tab1, view_tab2, view_tab3 = st.tabs(["Structured View", "Raw JSON", "With Images"]) | |
| else: | |
| view_tab1, view_tab2 = st.tabs(["Structured View", "Raw JSON"]) | |
| with view_tab1: | |
| # Display in a more user-friendly format based on the content structure | |
| html_content = "" | |
| if isinstance(result['ocr_contents'], dict): | |
| for section, content in result['ocr_contents'].items(): | |
| if content: # Only display non-empty sections | |
| section_title = f"<h4>{section.replace('_', ' ').title()}</h4>" | |
| html_content += section_title | |
| if isinstance(content, str): | |
| html_content += f"<p>{content}</p>" | |
| st.markdown(f"#### {section.replace('_', ' ').title()}") | |
| st.markdown(content) | |
| elif isinstance(content, list): | |
| html_list = "<ul>" | |
| st.markdown(f"#### {section.replace('_', ' ').title()}") | |
| for item in content: | |
| if isinstance(item, str): | |
| html_list += f"<li>{item}</li>" | |
| st.markdown(f"- {item}") | |
| elif isinstance(item, dict): | |
| html_list += f"<li>{json.dumps(item)}</li>" | |
| st.json(item) | |
| html_list += "</ul>" | |
| html_content += html_list | |
| elif isinstance(content, dict): | |
| html_dict = "<dl>" | |
| st.markdown(f"#### {section.replace('_', ' ').title()}") | |
| for k, v in content.items(): | |
| html_dict += f"<dt><strong>{k}</strong></dt><dd>{v}</dd>" | |
| st.markdown(f"**{k}:** {v}") | |
| html_dict += "</dl>" | |
| html_content += html_dict | |
| # Add download button in a smaller section | |
| with st.expander("Export Content"): | |
| # Alternative download button | |
| html_bytes = html_content.encode() | |
| st.download_button( | |
| label="Download as HTML", | |
| data=html_bytes, | |
| file_name="document_content.html", | |
| mime="text/html" | |
| ) | |
| with view_tab2: | |
| # Show the raw JSON for developers | |
| st.json(result) | |
| if has_images: | |
| with view_tab3: | |
| # Show loading indicator while preparing images | |
| with st.spinner("Preparing document with embedded images..."): | |
| try: | |
| # Import function | |
| try: | |
| from ocr_utils import get_combined_markdown | |
| except ImportError: | |
| st.error("Required module ocr_utils not found.") | |
| st.stop() | |
| # Check if raw_response is available | |
| if 'raw_response' not in result: | |
| st.warning("Raw OCR response not available. Cannot display images.") | |
| st.stop() | |
| # Validate the raw_response structure before processing | |
| if not hasattr(result['raw_response'], 'pages'): | |
| st.warning("Invalid OCR response format. Cannot display images.") | |
| st.stop() | |
| # Get the combined markdown with images | |
| combined_markdown = get_combined_markdown(result['raw_response']) | |
| if not combined_markdown or combined_markdown.strip() == "": | |
| st.warning("No image content found in the document.") | |
| st.stop() | |
| # Add CSS to ensure proper spacing and handling of text and images | |
| st.markdown(""" | |
| <style> | |
| .markdown-text-container { | |
| padding: 10px; | |
| background-color: #f9f9f9; | |
| border-radius: 5px; | |
| } | |
| .markdown-text-container img { | |
| margin: 15px 0; | |
| max-width: 100%; | |
| border: 1px solid #ddd; | |
| border-radius: 4px; | |
| display: block; | |
| } | |
| .markdown-text-container p { | |
| margin-bottom: 16px; | |
| line-height: 1.6; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # Wrap the markdown in a div with the class for styling | |
| st.markdown(f""" | |
| <div class="markdown-text-container"> | |
| {combined_markdown} | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Add a download button for the combined content | |
| st.download_button( | |
| label="Download with Images (HTML)", | |
| data=f""" | |
| <html> | |
| <head> | |
| <style> | |
| body {{ font-family: Arial, sans-serif; line-height: 1.6; }} | |
| img {{ max-width: 100%; margin: 15px 0; }} | |
| </style> | |
| </head> | |
| <body> | |
| {combined_markdown} | |
| </body> | |
| </html> | |
| """, | |
| file_name="document_with_images.html", | |
| mime="text/html" | |
| ) | |
| except Exception as e: | |
| st.error(f"Could not display document with images: {str(e)}") | |
| st.info("Try refreshing or processing the document again.") | |
| else: | |
| st.error("No OCR content was extracted from the document.") | |
| with results_tab2: | |
| st.subheader("Raw Processing Results") | |
| st.json(result) | |
| except Exception as e: | |
| st.error(f"Error processing document: {str(e)}") | |
| else: | |
| # Display sample images in the main area when no file is uploaded | |
| st.info("Upload a document to get started using the file uploader above.") | |
| # Show example images in a grid | |
| st.subheader("Example Documents") | |
| # Add a sample images container | |
| with st.container(): | |
| # Find sample images from the input directory to display | |
| input_dir = Path(__file__).parent / "input" | |
| sample_images = [] | |
| if input_dir.exists(): | |
| sample_images = list(input_dir.glob("*.jpg"))[:3] # Limit to 3 samples | |
| if sample_images: | |
| columns = st.columns(3) | |
| for i, img_path in enumerate(sample_images): | |
| with columns[i % 3]: | |
| st.image(str(img_path), caption=img_path.name, use_container_width=True) |