pdf-split / app.py
bluenevus's picture
Update app.py
dbcc72f verified
raw
history blame
21.5 kB
import gradio as gr
import pikepdf
import os
import zipfile
import shutil
from pathlib import Path
import uuid
from datetime import datetime, timedelta
import logging
import threading
import time
from typing import Tuple, List, Optional
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configuration - FIXED VALUES
TARGET_SEGMENT_SIZE_MB = 4.5 # Target size for each segment
MAX_ALLOWED_SIZE_MB = 5.0 # Maximum allowed size - discard if larger
TARGET_SEGMENT_SIZE_BYTES = int(TARGET_SEGMENT_SIZE_MB * 1024 * 1024) # 4.5MB in bytes
MAX_ALLOWED_SIZE_BYTES = int(MAX_ALLOWED_SIZE_MB * 1024 * 1024) # 5MB in bytes
TEMP_DIR = Path("temp_files")
CLEANUP_AFTER_MINUTES = 10
# Create temp directory
TEMP_DIR.mkdir(exist_ok=True)
# Store user sessions for cleanup
user_sessions = {}
class PDFProcessor:
"""Handle PDF splitting with qpdf/pikepdf - using incremental size checking like bash script"""
@staticmethod
def split_pdf_by_size(input_path: Path, output_dir: Path, progress_callback=None) -> Tuple[List[Path], dict]:
"""
Split PDF into segments of approximately 4.5MB, discarding any over 5MB
Uses the same incremental approach as the bash script
"""
kept_files = []
stats = {
"total_pages": 0,
"segments_created": 0,
"segments_discarded": 0,
"original_size_mb": 0,
"total_output_size_mb": 0,
"largest_segment_mb": 0,
"smallest_segment_mb": float('inf')
}
try:
# Get original file size
stats["original_size_mb"] = input_path.stat().st_size / 1024 / 1024
# Open PDF with pikepdf
with pikepdf.open(input_path, suppress_warnings=True, attempt_recovery=True) as pdf:
total_pages = len(pdf.pages)
stats["total_pages"] = total_pages
if total_pages == 0:
return kept_files, stats
start_page = 0
part = 1
while start_page < total_pages:
# Start with a single page
end_page = start_page
temp_segment = None
last_good_segment = None
last_good_end = start_page
# Update progress
if progress_callback:
progress = (start_page / total_pages)
progress_callback(progress, f"Processing segment {part}, starting at page {start_page + 1}...")
# Keep adding pages until we exceed the size limit
while end_page < total_pages:
# Create temporary segment with pages from start_page to end_page (inclusive)
temp_filename = f"temp_segment_{part}.pdf"
temp_path = output_dir / temp_filename
try:
# Create new PDF with selected pages
segment_pdf = pikepdf.new()
# Add pages from start_page to end_page (inclusive)
for page_num in range(start_page, end_page + 1):
segment_pdf.pages.append(pdf.pages[page_num])
# Save with compression
segment_pdf.save(
temp_path,
compress_streams=True,
object_stream_mode=pikepdf.ObjectStreamMode.generate,
linearize=False # Don't linearize to save time during testing
)
# Check file size
segment_size = temp_path.stat().st_size
segment_size_mb = segment_size / 1024 / 1024
logger.debug(f"Testing segment {part}: pages {start_page+1}-{end_page+1}, size: {segment_size_mb:.2f} MB")
if segment_size < TARGET_SEGMENT_SIZE_BYTES:
# Still under target size, keep this as last good and try adding more pages
if last_good_segment and last_good_segment.exists():
last_good_segment.unlink() # Delete previous good segment
last_good_segment = temp_path
last_good_end = end_page
# If we're at the last page, this is our final segment
if end_page == total_pages - 1:
break
# Try adding one more page
end_page += 1
elif segment_size <= MAX_ALLOWED_SIZE_BYTES:
# Between 4.5MB and 5MB - this is acceptable, use it
if last_good_segment and last_good_segment.exists():
last_good_segment.unlink()
last_good_segment = temp_path
last_good_end = end_page
break # Stop here, we found a good size
else:
# Over 5MB limit
temp_path.unlink() # Delete oversized segment
if end_page == start_page:
# Single page is over 5MB - discard it
logger.warning(f"Single page {start_page+1} exceeds 5MB limit - discarding")
stats["segments_discarded"] += 1
last_good_end = start_page # Move past this page
break
else:
# Multiple pages - use the last good segment
break
except Exception as e:
logger.error(f"Error creating segment: {e}")
if temp_path and temp_path.exists():
temp_path.unlink()
break
# Save the final segment for this part
if last_good_segment and last_good_segment.exists():
# Rename to final name
final_filename = f"segment_{part:03d}_p{start_page+1}-{last_good_end+1}.pdf"
final_path = output_dir / final_filename
last_good_segment.rename(final_path)
# Check final size and add to kept files
final_size = final_path.stat().st_size
final_size_mb = final_size / 1024 / 1024
if final_size <= MAX_ALLOWED_SIZE_BYTES:
kept_files.append(final_path)
stats["segments_created"] += 1
stats["total_output_size_mb"] += final_size_mb
stats["largest_segment_mb"] = max(stats["largest_segment_mb"], final_size_mb)
stats["smallest_segment_mb"] = min(stats["smallest_segment_mb"], final_size_mb)
logger.info(f"Created segment {part}: {final_size_mb:.2f} MB (pages {start_page+1}-{last_good_end+1})")
else:
# Should not happen, but just in case
final_path.unlink()
stats["segments_discarded"] += 1
logger.warning(f"Final segment {part} exceeded 5MB limit after rename")
# Move to next segment
start_page = last_good_end + 1
part += 1
# Clean up any remaining temp files
for temp_file in output_dir.glob("temp_segment_*.pdf"):
try:
temp_file.unlink()
except:
pass
# Final cleanup
if stats["smallest_segment_mb"] == float('inf'):
stats["smallest_segment_mb"] = 0
if progress_callback:
progress_callback(1.0, "Splitting complete!")
except Exception as e:
logger.error(f"Error splitting PDF: {str(e)}")
# Clean up temp files on error
for temp_file in output_dir.glob("temp_segment_*.pdf"):
try:
temp_file.unlink()
except:
pass
raise
return kept_files, stats
class SessionManager:
"""Manage user sessions and cleanup"""
@staticmethod
def create_session(session_id: str) -> Path:
"""Create a new user session directory"""
session_dir = TEMP_DIR / session_id
session_dir.mkdir(exist_ok=True)
user_sessions[session_id] = {
"created": datetime.now(),
"dir": session_dir
}
return session_dir
@staticmethod
def cleanup_old_sessions():
"""Remove old session directories"""
current_time = datetime.now()
sessions_to_remove = []
for session_id, session_info in user_sessions.items():
if current_time - session_info["created"] > timedelta(minutes=CLEANUP_AFTER_MINUTES):
try:
shutil.rmtree(session_info["dir"], ignore_errors=True)
sessions_to_remove.append(session_id)
logger.info(f"Cleaned up session: {session_id}")
except Exception as e:
logger.error(f"Error cleaning session {session_id}: {e}")
for session_id in sessions_to_remove:
del user_sessions[session_id]
# Start cleanup thread
def cleanup_worker():
"""Background thread for cleaning old files"""
while True:
try:
SessionManager.cleanup_old_sessions()
time.sleep(60) # Check every minute
except Exception as e:
logger.error(f"Cleanup error: {e}")
cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True)
cleanup_thread.start()
def process_pdf(file_obj, progress=gr.Progress()) -> Tuple[Optional[str], str, str]:
"""
Main processing function for Gradio interface
Returns: (zip_file_path, statistics_html, status_message)
"""
if file_obj is None:
return None, "", "⚠️ Please upload a PDF file"
session_id = str(uuid.uuid4())[:8]
session_dir = SessionManager.create_session(session_id)
try:
# Update progress
progress(0.1, "Initializing...")
# Save uploaded file
input_path = session_dir / "input.pdf"
# Handle both file path string and file object
if isinstance(file_obj, str):
shutil.copy(file_obj, input_path)
else:
with open(input_path, 'wb') as f:
f.write(file_obj.read() if hasattr(file_obj, 'read') else file_obj)
# Verify it's a valid PDF
progress(0.2, "Verifying PDF...")
with pikepdf.open(input_path) as pdf:
page_count = len(pdf.pages)
logger.info(f"Valid PDF with {page_count} pages")
# Create output directory
output_dir = session_dir / "output"
output_dir.mkdir(exist_ok=True)
# Split PDF with size constraints
progress(0.3, "Splitting PDF into 4.5MB segments...")
def update_progress(value, message):
scaled_progress = 0.3 + (value * 0.5)
progress(scaled_progress, message)
output_files, stats = PDFProcessor.split_pdf_by_size(
input_path,
output_dir,
progress_callback=update_progress
)
if not output_files:
return None, "", "❌ No valid segments created (all segments exceeded 5MB limit)"
# Create ZIP file
progress(0.9, "Creating ZIP archive...")
zip_filename = f"pdf_segments_{session_id}.zip"
zip_path = session_dir / zip_filename
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for file_path in output_files:
zipf.write(file_path, file_path.name)
# Generate statistics with proper styling
stats_html = f"""
<div style="padding: 20px; background: #f0f9ff; border-radius: 10px; margin: 10px 0; border: 2px solid #0284c7;">
<h3 style="color: #0c4a6e; margin-top: 0;">πŸ“Š Processing Results</h3>
<table style="width: 100%; border-collapse: collapse; background: white; border-radius: 5px;">
<tr style="border-bottom: 1px solid #e2e8f0;">
<td style="padding: 10px; font-weight: bold; color: #334155;">πŸ“„ Total Pages:</td>
<td style="padding: 10px; text-align: right; color: #475569; font-weight: 600;">{stats['total_pages']}</td>
</tr>
<tr style="border-bottom: 1px solid #e2e8f0; background: #f8fafc;">
<td style="padding: 10px; font-weight: bold; color: #334155;">βœ… Segments Created (≀5MB):</td>
<td style="padding: 10px; text-align: right; color: #16a34a; font-weight: 600;">{stats['segments_created']}</td>
</tr>
<tr style="border-bottom: 1px solid #e2e8f0;">
<td style="padding: 10px; font-weight: bold; color: #334155;">❌ Segments Discarded (>5MB):</td>
<td style="padding: 10px; text-align: right; color: #dc2626; font-weight: 600;">{stats['segments_discarded']}</td>
</tr>
</table>
<p style="margin-top: 15px; color: #059669; font-weight: bold;">
✨ Your file has been split successfully! Click the download button below.
</p>
</div>
"""
progress(1.0, "Complete! πŸŽ‰")
# Clean up input file to save space
input_path.unlink()
# IMPORTANT: Return the actual file path as a string for gr.File to handle
return str(zip_path), stats_html, "βœ… Processing complete! Your ZIP file is ready for download."
except Exception as e:
logger.error(f"Processing error: {str(e)}")
# Cleanup on error
try:
shutil.rmtree(session_dir, ignore_errors=True)
except:
pass
return None, "", f"❌ Error: {str(e)}"
# Create Gradio interface with fixed download component
with gr.Blocks(
title="PDF Splitter - Fast & Simple",
theme=gr.themes.Base()
) as app:
gr.Markdown("""
# πŸ“„ PDF Splitter Tool
**Split large PDFs into 4.5MB segments - Files over 5MB are automatically discarded!**
""")
with gr.Row():
with gr.Column():
file_input = gr.File(
label="Upload PDF",
file_types=[".pdf"],
type="filepath"
)
split_btn = gr.Button(
"πŸš€ Split PDF into 4.5MB Segments",
variant="primary",
size="lg"
)
with gr.Row():
status_text = gr.Markdown("Ready to process your PDF...")
with gr.Row():
stats_output = gr.HTML()
with gr.Row():
# FIXED: Set interactive=True and use value parameter correctly
download_file = gr.File(
label="πŸ“¦ Download ZIP (Contains only segments ≀5MB)",
visible=True,
interactive=False, # Keep false initially
type="filepath" # Ensure it's set to filepath
)
# Handle processing with proper output mapping
def process_and_update(file_obj, progress=gr.Progress()):
zip_path, stats_html, status_msg = process_pdf(file_obj, progress)
# Return the file path directly for gr.File component
return zip_path, stats_html, status_msg
split_btn.click(
fn=process_and_update,
inputs=[file_input],
outputs=[download_file, stats_output, status_text]
)
# Create Gradio interface with fixed theme
with gr.Blocks(
title="PDF Splitter - Fast & Simple",
theme=gr.themes.Base(), # Using Base theme for better control
css="""
.gradio-container {
max-width: 800px;
margin: auto;
}
/* Fix text colors to ensure visibility */
.markdown-text, .markdown-text p, .markdown-text h1, .markdown-text h2, .markdown-text h3 {
color: #1f2937 !important;
}
/* Ensure download button is styled properly */
.download-button, button[aria-label*="Download"] {
background-color: #3b82f6 !important;
color: white !important;
cursor: pointer !important;
}
.download-button:hover, button[aria-label*="Download"]:hover {
background-color: #2563eb !important;
}
/* Fix file component styling */
.file-preview {
background-color: #f3f4f6 !important;
border: 1px solid #d1d5db !important;
}
/* Ensure all text is visible */
label, .label-text {
color: #374151 !important;
}
/* Status text visibility */
.status-text {
color: #1f2937 !important;
font-weight: 500;
}
"""
) as app:
gr.Markdown("""
# πŸ“„ PDF Splitter Tool
**Split large PDFs into 4.5MB segments - Files over 5MB are automatically discarded!**
This tool uses advanced compression with qpdf to split your PDF into segments of approximately **4.5 MB** each.
Any segments that exceed **5 MB** are automatically discarded to ensure all output files meet size requirements.
### How to use:
1. Upload your PDF file
2. Click "Split PDF"
3. Download the ZIP file containing only segments ≀5MB
*Note: Files are automatically deleted after 10 minutes for your privacy.*
""")
with gr.Row():
with gr.Column():
file_input = gr.File(
label="Upload PDF",
file_types=[".pdf"],
type="filepath",
elem_classes="file-upload"
)
split_btn = gr.Button(
"πŸš€ Split PDF into 4.5MB Segments",
variant="primary",
size="lg",
elem_classes="split-button"
)
with gr.Row():
status_text = gr.Markdown("Ready to process your PDF...", elem_classes="status-text")
with gr.Row():
stats_output = gr.HTML(elem_classes="stats-output")
with gr.Row():
download_file = gr.File(
label="πŸ“¦ Download ZIP (Contains only segments ≀5MB)",
visible=True,
elem_classes="download-section",
interactive=False # Make it non-interactive until file is ready
)
# Handle processing
split_btn.click(
fn=process_pdf,
inputs=[file_input],
outputs=[download_file, stats_output, status_text]
)
# Add features with proper styling
gr.Markdown("""
---
### πŸ’‘ Key Features:
- βœ… **Target segment size: 4.5MB** - Optimized for most systems
- βœ… **Maximum allowed size: 5MB** - Segments over 5MB are automatically discarded
- βœ… **Smart splitting** - Adjusts page count per segment dynamically
- βœ… **Compressed output** - Uses qpdf for efficient PDF compression
- βœ… **Automatic cleanup** - Files deleted after 10 minutes
- βœ… **Progress tracking** - Real-time updates during processing
### πŸ”’ Privacy & Security:
- All uploaded files are automatically deleted after processing
- No files are stored permanently on the server
- Each user gets a unique session ID for file isolation
### βš™οΈ Technical Details:
- Uses **pikepdf** (qpdf wrapper) for efficient PDF manipulation
- Maintains PDF compression without decompressing
- Dynamically adjusts segment size based on page content
- Automatically retries with fewer pages if segment exceeds limits
""", elem_classes="features-section")
# Launch the app
if __name__ == "__main__":
app.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True
)