milwright's picture
submit pull for merge
85bdb4e verified
raw
history blame
26.5 kB
import os
import streamlit as st
import json
import sys
import time
from pathlib import Path
import tempfile
import io
from pdf2image import convert_from_bytes
from PIL import Image, ImageEnhance, ImageFilter
import cv2
import numpy as np
# Import the StructuredOCR class and config from the local files
from structured_ocr import StructuredOCR
from config import MISTRAL_API_KEY
# Set page configuration
st.set_page_config(
page_title="Historical OCR",
page_icon="🚀",
layout="wide",
initial_sidebar_state="expanded"
)
# Enable caching for expensive operations
@st.cache_data(ttl=3600, show_spinner=False)
def convert_pdf_to_images(pdf_bytes, dpi=150):
"""Convert PDF bytes to a list of images with caching"""
try:
return convert_from_bytes(pdf_bytes, dpi=dpi)
except Exception as e:
st.error(f"Error converting PDF: {str(e)}")
return []
@st.cache_data(ttl=3600, show_spinner=False)
def preprocess_image(image_bytes, preprocessing_options):
"""Preprocess image with selected options"""
# Convert bytes to OpenCV format
image = Image.open(io.BytesIO(image_bytes))
img_array = np.array(image)
# Apply preprocessing based on selected options
if preprocessing_options.get("grayscale", False):
img_array = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)
img_array = cv2.cvtColor(img_array, cv2.COLOR_GRAY2RGB)
if preprocessing_options.get("contrast", 0) != 0:
contrast_factor = 1 + (preprocessing_options.get("contrast", 0) / 10)
image = Image.fromarray(img_array)
enhancer = ImageEnhance.Contrast(image)
image = enhancer.enhance(contrast_factor)
img_array = np.array(image)
if preprocessing_options.get("denoise", False):
img_array = cv2.fastNlMeansDenoisingColored(img_array, None, 10, 10, 7, 21)
if preprocessing_options.get("threshold", False):
# Convert to grayscale if not already
if len(img_array.shape) == 3:
gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)
else:
gray = img_array
# Apply adaptive threshold
binary = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 11, 2)
# Convert back to RGB
img_array = cv2.cvtColor(binary, cv2.COLOR_GRAY2RGB)
# Convert back to PIL Image
processed_image = Image.fromarray(img_array)
# Convert to bytes
byte_io = io.BytesIO()
processed_image.save(byte_io, format='PNG')
byte_io.seek(0)
return byte_io.getvalue()
# Define functions
def process_file(uploaded_file, use_vision=True, preprocessing_options=None):
"""Process the uploaded file and return the OCR results
Args:
uploaded_file: The uploaded file to process
use_vision: Whether to use vision model
preprocessing_options: Dictionary of preprocessing options
"""
if preprocessing_options is None:
preprocessing_options = {}
# Show progress indicator
progress_bar = st.progress(0)
status_text = st.empty()
status_text.text("Preparing file for processing...")
# Save the uploaded file to a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=Path(uploaded_file.name).suffix) as tmp:
tmp.write(uploaded_file.getvalue())
temp_path = tmp.name
try:
# Check if API key is available
if not MISTRAL_API_KEY:
# Return dummy data if no API key
progress_bar.progress(100)
status_text.empty()
return {
"file_name": uploaded_file.name,
"topics": ["Sample Document"],
"languages": ["English"],
"ocr_contents": {
"title": "Sample Document",
"content": "This is sample content. To process real documents, please set the MISTRAL_API_KEY environment variable."
}
}
# Update progress
progress_bar.progress(20)
status_text.text("Initializing OCR processor...")
# Initialize OCR processor
processor = StructuredOCR()
# Determine file type from extension
file_ext = Path(uploaded_file.name).suffix.lower()
file_type = "pdf" if file_ext == ".pdf" else "image"
# Apply preprocessing if needed
if any(preprocessing_options.values()) and file_type == "image":
status_text.text("Applying image preprocessing...")
processed_bytes = preprocess_image(uploaded_file.getvalue(), preprocessing_options)
# Save processed image to temp file
with tempfile.NamedTemporaryFile(delete=False, suffix=Path(uploaded_file.name).suffix) as proc_tmp:
proc_tmp.write(processed_bytes)
temp_path = proc_tmp.name
# Get file size in MB
file_size_mb = os.path.getsize(temp_path) / (1024 * 1024)
# Check if file exceeds API limits (50 MB)
if file_size_mb > 50:
st.error(f"File too large ({file_size_mb:.1f} MB). Maximum file size allowed by Mistral API is 50MB.")
return {
"file_name": uploaded_file.name,
"topics": ["Document"],
"languages": ["English"],
"confidence_score": 0.0,
"error": f"File size {file_size_mb:.2f} MB exceeds Mistral API limit of 50 MB",
"ocr_contents": {
"error": f"Failed to process file: File size {file_size_mb:.2f} MB exceeds Mistral API limit of 50 MB",
"partial_text": "Document could not be processed due to size limitations."
}
}
# Update progress
progress_bar.progress(40)
status_text.text("Processing document with OCR...")
# Process the file with file size information for automatic page limiting
# Make sure we're using the latest mistral-ocr model
# See https://docs.mistral.ai/capabilities/document/ for more info
result = processor.process_file(temp_path, file_type=file_type, use_vision=use_vision, file_size_mb=file_size_mb)
# Complete progress
progress_bar.progress(100)
status_text.empty()
return result
except Exception as e:
progress_bar.progress(100)
status_text.empty()
st.error(f"Error during processing: {str(e)}")
raise
finally:
# Clean up the temporary file
if os.path.exists(temp_path):
os.unlink(temp_path)
# App title and description
st.title("Historical Document OCR")
st.subheader("Powered by Mistral AI")
# Create main layout with tabs and columns
main_tab1, main_tab2 = st.tabs(["Document Processing", "About"])
with main_tab1:
# Create a two-column layout for file upload and preview
upload_col, preview_col = st.columns([1, 1])
# File uploader in the left column
with upload_col:
st.markdown("""
Upload an image or PDF file to get started.
Using the latest `mistral-ocr-latest` model for advanced document understanding.
""")
uploaded_file = st.file_uploader("Choose a file", type=["pdf", "png", "jpg", "jpeg"])
# Sidebar with options
with st.sidebar:
st.header("Options")
# Model options
st.subheader("Model Settings")
use_vision = st.checkbox("Use Vision Model", value=True,
help="For image files, use the vision model for improved analysis (may be slower)")
# Image preprocessing options (collapsible)
st.subheader("Image Preprocessing")
with st.expander("Preprocessing Options"):
preprocessing_options = {}
preprocessing_options["grayscale"] = st.checkbox("Convert to Grayscale",
help="Convert image to grayscale before OCR")
preprocessing_options["threshold"] = st.checkbox("Apply Thresholding",
help="Apply adaptive thresholding to enhance text")
preprocessing_options["denoise"] = st.checkbox("Denoise Image",
help="Remove noise from the image")
preprocessing_options["contrast"] = st.slider("Adjust Contrast", -5, 5, 0,
help="Adjust image contrast (-5 to +5)")
# PDF options (collapsible)
st.subheader("PDF Options")
with st.expander("PDF Settings"):
pdf_dpi = st.slider("PDF Resolution (DPI)", 72, 300, 150,
help="Higher DPI gives better quality but slower processing")
max_pages = st.number_input("Maximum Pages to Process", 1, 20, 5,
help="Limit number of pages to process")
# About tab content
with main_tab2:
st.markdown("""
### About This Application
This app uses [Mistral AI's Document OCR](https://docs.mistral.ai/capabilities/document/) to extract text and images from historical documents.
It can process:
- Image files (jpg, png, etc.)
- PDF documents (multi-page support)
The extracted content is processed into structured data based on the document type, combining:
- Text extraction with `mistral-ocr-latest`
- Analysis with language models
- Layout preservation with images
View results in three formats:
- Structured HTML view
- Raw JSON (for developers)
- Markdown with images (preserves document layout)
**New Features:**
- Image preprocessing for better OCR quality
- PDF resolution and page controls
- Progress tracking during processing
""")
with main_tab1:
if uploaded_file is not None:
# Check file size (cap at 50MB)
file_size_mb = len(uploaded_file.getvalue()) / (1024 * 1024)
if file_size_mb > 50:
with upload_col:
st.error(f"File too large ({file_size_mb:.1f} MB). Maximum file size is 50MB.")
st.stop()
file_ext = Path(uploaded_file.name).suffix.lower()
# Display document preview in preview column
with preview_col:
st.subheader("Document Preview")
if file_ext == ".pdf":
try:
# Convert first page of PDF to image for preview
pdf_bytes = uploaded_file.getvalue()
images = convert_from_bytes(pdf_bytes, first_page=1, last_page=1, dpi=150)
if images:
# Convert PIL image to bytes for Streamlit
first_page = images[0]
img_bytes = io.BytesIO()
first_page.save(img_bytes, format='JPEG')
img_bytes.seek(0)
# Display the PDF preview
st.image(img_bytes, caption=f"PDF Preview: {uploaded_file.name}", use_container_width=True)
else:
st.info(f"PDF uploaded: {uploaded_file.name}")
except Exception:
# Simply show the file name without an error message
st.info(f"PDF uploaded: {uploaded_file.name}")
st.info("Click 'Process Document' to analyze the content.")
else:
st.image(uploaded_file, use_container_width=True)
# Add image preprocessing preview in a collapsible section if needed
if any(preprocessing_options.values()) and uploaded_file.type.startswith('image/'):
with st.expander("Image Preprocessing Preview"):
preview_cols = st.columns(2)
with preview_cols[0]:
st.markdown("**Original Image**")
st.image(uploaded_file, use_container_width=True)
with preview_cols[1]:
st.markdown("**Preprocessed Image**")
try:
processed_bytes = preprocess_image(uploaded_file.getvalue(), preprocessing_options)
st.image(io.BytesIO(processed_bytes), use_container_width=True)
except Exception as e:
st.error(f"Error in preprocessing: {str(e)}")
# Process button - flush left with similar padding as file browser
with upload_col:
process_button = st.button("Process Document", use_container_width=True)
# Results section
if process_button:
try:
# Get max_pages or default if not available
max_pages_value = max_pages if 'max_pages' in locals() else None
# Call process_file with all options
result = process_file(uploaded_file, use_vision, preprocessing_options)
# Create results tabs for better organization
results_tab1, results_tab2 = st.tabs(["Document Analysis", "Technical Details"])
with results_tab1:
# Create two columns for metadata and content
meta_col, content_col = st.columns([1, 2])
with meta_col:
st.subheader("Document Metadata")
st.success("**Document processed successfully**")
# Display file info
st.write(f"**File Name:** {result.get('file_name', uploaded_file.name)}")
# Display info if only limited pages were processed
if 'limited_pages' in result:
st.info(f"Processed {result['limited_pages']['processed']} of {result['limited_pages']['total']} pages")
# Display languages if available
if 'languages' in result:
languages = [lang for lang in result['languages'] if lang is not None]
if languages:
st.write(f"**Languages:** {', '.join(languages)}")
# Confidence score if available
if 'confidence_score' in result:
confidence = result['confidence_score']
st.write(f"**OCR Confidence:** {confidence:.1%}")
# Display topics if available
if 'topics' in result and result['topics']:
st.write(f"**Topics:** {', '.join(result['topics'])}")
with content_col:
st.subheader("Document Contents")
if 'ocr_contents' in result:
# Check if there are images in the OCR result
has_images = False
if 'raw_response' in result:
try:
has_images = any(page.images for page in result['raw_response'].pages)
except Exception:
has_images = False
# Create tabs for different views
if has_images:
view_tab1, view_tab2, view_tab3 = st.tabs(["Structured View", "Raw JSON", "With Images"])
else:
view_tab1, view_tab2 = st.tabs(["Structured View", "Raw JSON"])
with view_tab1:
# Display in a more user-friendly format based on the content structure
html_content = ""
if isinstance(result['ocr_contents'], dict):
for section, content in result['ocr_contents'].items():
if content: # Only display non-empty sections
section_title = f"<h4>{section.replace('_', ' ').title()}</h4>"
html_content += section_title
if isinstance(content, str):
html_content += f"<p>{content}</p>"
st.markdown(f"#### {section.replace('_', ' ').title()}")
st.markdown(content)
elif isinstance(content, list):
html_list = "<ul>"
st.markdown(f"#### {section.replace('_', ' ').title()}")
for item in content:
if isinstance(item, str):
html_list += f"<li>{item}</li>"
st.markdown(f"- {item}")
elif isinstance(item, dict):
html_list += f"<li>{json.dumps(item)}</li>"
st.json(item)
html_list += "</ul>"
html_content += html_list
elif isinstance(content, dict):
html_dict = "<dl>"
st.markdown(f"#### {section.replace('_', ' ').title()}")
for k, v in content.items():
html_dict += f"<dt><strong>{k}</strong></dt><dd>{v}</dd>"
st.markdown(f"**{k}:** {v}")
html_dict += "</dl>"
html_content += html_dict
# Add download button in a smaller section
with st.expander("Export Content"):
# Alternative download button
html_bytes = html_content.encode()
st.download_button(
label="Download as HTML",
data=html_bytes,
file_name="document_content.html",
mime="text/html"
)
with view_tab2:
# Show the raw JSON for developers
st.json(result)
if has_images:
with view_tab3:
# Show loading indicator while preparing images
with st.spinner("Preparing document with embedded images..."):
try:
# Import function
try:
from ocr_utils import get_combined_markdown
except ImportError:
st.error("Required module ocr_utils not found.")
st.stop()
# Check if raw_response is available
if 'raw_response' not in result:
st.warning("Raw OCR response not available. Cannot display images.")
st.stop()
# Validate the raw_response structure before processing
if not hasattr(result['raw_response'], 'pages'):
st.warning("Invalid OCR response format. Cannot display images.")
st.stop()
# Get the combined markdown with images
combined_markdown = get_combined_markdown(result['raw_response'])
if not combined_markdown or combined_markdown.strip() == "":
st.warning("No image content found in the document.")
st.stop()
# Add CSS to ensure proper spacing and handling of text and images
st.markdown("""
<style>
.markdown-text-container {
padding: 10px;
background-color: #f9f9f9;
border-radius: 5px;
}
.markdown-text-container img {
margin: 15px 0;
max-width: 100%;
border: 1px solid #ddd;
border-radius: 4px;
display: block;
}
.markdown-text-container p {
margin-bottom: 16px;
line-height: 1.6;
}
</style>
""", unsafe_allow_html=True)
# Wrap the markdown in a div with the class for styling
st.markdown(f"""
<div class="markdown-text-container">
{combined_markdown}
</div>
""", unsafe_allow_html=True)
# Add a download button for the combined content
st.download_button(
label="Download with Images (HTML)",
data=f"""
<html>
<head>
<style>
body {{ font-family: Arial, sans-serif; line-height: 1.6; }}
img {{ max-width: 100%; margin: 15px 0; }}
</style>
</head>
<body>
{combined_markdown}
</body>
</html>
""",
file_name="document_with_images.html",
mime="text/html"
)
except Exception as e:
st.error(f"Could not display document with images: {str(e)}")
st.info("Try refreshing or processing the document again.")
else:
st.error("No OCR content was extracted from the document.")
with results_tab2:
st.subheader("Raw Processing Results")
st.json(result)
except Exception as e:
st.error(f"Error processing document: {str(e)}")
else:
# Display sample images in the main area when no file is uploaded
st.info("Upload a document to get started using the file uploader above.")
# Show example images in a grid
st.subheader("Example Documents")
# Add a sample images container
with st.container():
# Find sample images from the input directory to display
input_dir = Path(__file__).parent / "input"
sample_images = []
if input_dir.exists():
sample_images = list(input_dir.glob("*.jpg"))[:3] # Limit to 3 samples
if sample_images:
columns = st.columns(3)
for i, img_path in enumerate(sample_images):
with columns[i % 3]:
st.image(str(img_path), caption=img_path.name, use_container_width=True)