Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pikepdf | |
| import os | |
| import zipfile | |
| import shutil | |
| from pathlib import Path | |
| import uuid | |
| from datetime import datetime, timedelta | |
| import logging | |
| import threading | |
| import time | |
| from typing import Tuple, List, Optional | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Configuration - FIXED VALUES | |
| TARGET_SEGMENT_SIZE_MB = 4.5 # Target size for each segment | |
| MAX_ALLOWED_SIZE_MB = 5.0 # Maximum allowed size - discard if larger | |
| TARGET_SEGMENT_SIZE_BYTES = int(TARGET_SEGMENT_SIZE_MB * 1024 * 1024) # 4.5MB in bytes | |
| MAX_ALLOWED_SIZE_BYTES = int(MAX_ALLOWED_SIZE_MB * 1024 * 1024) # 5MB in bytes | |
| TEMP_DIR = Path("temp_files") | |
| CLEANUP_AFTER_MINUTES = 10 | |
| # Create temp directory | |
| TEMP_DIR.mkdir(exist_ok=True) | |
| # Store user sessions for cleanup | |
| user_sessions = {} | |
| class PDFProcessor: | |
| """Handle PDF splitting with qpdf/pikepdf - using incremental size checking like bash script""" | |
| def split_pdf_by_size(input_path: Path, output_dir: Path, progress_callback=None) -> Tuple[List[Path], dict]: | |
| """ | |
| Split PDF into segments of approximately 4.5MB, discarding any over 5MB | |
| Uses the same incremental approach as the bash script | |
| """ | |
| kept_files = [] | |
| stats = { | |
| "total_pages": 0, | |
| "segments_created": 0, | |
| "segments_discarded": 0, | |
| "original_size_mb": 0, | |
| "total_output_size_mb": 0, | |
| "largest_segment_mb": 0, | |
| "smallest_segment_mb": float('inf') | |
| } | |
| try: | |
| # Get original file size | |
| stats["original_size_mb"] = input_path.stat().st_size / 1024 / 1024 | |
| # Open PDF with pikepdf | |
| with pikepdf.open(input_path, suppress_warnings=True, attempt_recovery=True) as pdf: | |
| total_pages = len(pdf.pages) | |
| stats["total_pages"] = total_pages | |
| if total_pages == 0: | |
| return kept_files, stats | |
| start_page = 0 | |
| part = 1 | |
| while start_page < total_pages: | |
| # Start with a single page | |
| end_page = start_page | |
| temp_segment = None | |
| last_good_segment = None | |
| last_good_end = start_page | |
| # Update progress | |
| if progress_callback: | |
| progress = (start_page / total_pages) | |
| progress_callback(progress, f"Processing segment {part}, starting at page {start_page + 1}...") | |
| # Keep adding pages until we exceed the size limit | |
| while end_page < total_pages: | |
| # Create temporary segment with pages from start_page to end_page (inclusive) | |
| temp_filename = f"temp_segment_{part}.pdf" | |
| temp_path = output_dir / temp_filename | |
| try: | |
| # Create new PDF with selected pages | |
| segment_pdf = pikepdf.new() | |
| # Add pages from start_page to end_page (inclusive) | |
| for page_num in range(start_page, end_page + 1): | |
| segment_pdf.pages.append(pdf.pages[page_num]) | |
| # Save with compression | |
| segment_pdf.save( | |
| temp_path, | |
| compress_streams=True, | |
| object_stream_mode=pikepdf.ObjectStreamMode.generate, | |
| linearize=False # Don't linearize to save time during testing | |
| ) | |
| # Check file size | |
| segment_size = temp_path.stat().st_size | |
| segment_size_mb = segment_size / 1024 / 1024 | |
| logger.debug(f"Testing segment {part}: pages {start_page+1}-{end_page+1}, size: {segment_size_mb:.2f} MB") | |
| if segment_size < TARGET_SEGMENT_SIZE_BYTES: | |
| # Still under target size, keep this as last good and try adding more pages | |
| if last_good_segment and last_good_segment.exists(): | |
| last_good_segment.unlink() # Delete previous good segment | |
| last_good_segment = temp_path | |
| last_good_end = end_page | |
| # If we're at the last page, this is our final segment | |
| if end_page == total_pages - 1: | |
| break | |
| # Try adding one more page | |
| end_page += 1 | |
| elif segment_size <= MAX_ALLOWED_SIZE_BYTES: | |
| # Between 4.5MB and 5MB - this is acceptable, use it | |
| if last_good_segment and last_good_segment.exists(): | |
| last_good_segment.unlink() | |
| last_good_segment = temp_path | |
| last_good_end = end_page | |
| break # Stop here, we found a good size | |
| else: | |
| # Over 5MB limit | |
| temp_path.unlink() # Delete oversized segment | |
| if end_page == start_page: | |
| # Single page is over 5MB - discard it | |
| logger.warning(f"Single page {start_page+1} exceeds 5MB limit - discarding") | |
| stats["segments_discarded"] += 1 | |
| last_good_end = start_page # Move past this page | |
| break | |
| else: | |
| # Multiple pages - use the last good segment | |
| break | |
| except Exception as e: | |
| logger.error(f"Error creating segment: {e}") | |
| if temp_path and temp_path.exists(): | |
| temp_path.unlink() | |
| break | |
| # Save the final segment for this part | |
| if last_good_segment and last_good_segment.exists(): | |
| # Rename to final name | |
| final_filename = f"segment_{part:03d}_p{start_page+1}-{last_good_end+1}.pdf" | |
| final_path = output_dir / final_filename | |
| last_good_segment.rename(final_path) | |
| # Check final size and add to kept files | |
| final_size = final_path.stat().st_size | |
| final_size_mb = final_size / 1024 / 1024 | |
| if final_size <= MAX_ALLOWED_SIZE_BYTES: | |
| kept_files.append(final_path) | |
| stats["segments_created"] += 1 | |
| stats["total_output_size_mb"] += final_size_mb | |
| stats["largest_segment_mb"] = max(stats["largest_segment_mb"], final_size_mb) | |
| stats["smallest_segment_mb"] = min(stats["smallest_segment_mb"], final_size_mb) | |
| logger.info(f"Created segment {part}: {final_size_mb:.2f} MB (pages {start_page+1}-{last_good_end+1})") | |
| else: | |
| # Should not happen, but just in case | |
| final_path.unlink() | |
| stats["segments_discarded"] += 1 | |
| logger.warning(f"Final segment {part} exceeded 5MB limit after rename") | |
| # Move to next segment | |
| start_page = last_good_end + 1 | |
| part += 1 | |
| # Clean up any remaining temp files | |
| for temp_file in output_dir.glob("temp_segment_*.pdf"): | |
| try: | |
| temp_file.unlink() | |
| except: | |
| pass | |
| # Final cleanup | |
| if stats["smallest_segment_mb"] == float('inf'): | |
| stats["smallest_segment_mb"] = 0 | |
| if progress_callback: | |
| progress_callback(1.0, "Splitting complete!") | |
| except Exception as e: | |
| logger.error(f"Error splitting PDF: {str(e)}") | |
| # Clean up temp files on error | |
| for temp_file in output_dir.glob("temp_segment_*.pdf"): | |
| try: | |
| temp_file.unlink() | |
| except: | |
| pass | |
| raise | |
| return kept_files, stats | |
| class SessionManager: | |
| """Manage user sessions and cleanup""" | |
| def create_session(session_id: str) -> Path: | |
| """Create a new user session directory""" | |
| session_dir = TEMP_DIR / session_id | |
| session_dir.mkdir(exist_ok=True) | |
| user_sessions[session_id] = { | |
| "created": datetime.now(), | |
| "dir": session_dir | |
| } | |
| return session_dir | |
| def cleanup_old_sessions(): | |
| """Remove old session directories""" | |
| current_time = datetime.now() | |
| sessions_to_remove = [] | |
| for session_id, session_info in user_sessions.items(): | |
| if current_time - session_info["created"] > timedelta(minutes=CLEANUP_AFTER_MINUTES): | |
| try: | |
| shutil.rmtree(session_info["dir"], ignore_errors=True) | |
| sessions_to_remove.append(session_id) | |
| logger.info(f"Cleaned up session: {session_id}") | |
| except Exception as e: | |
| logger.error(f"Error cleaning session {session_id}: {e}") | |
| for session_id in sessions_to_remove: | |
| del user_sessions[session_id] | |
| # Start cleanup thread | |
| def cleanup_worker(): | |
| """Background thread for cleaning old files""" | |
| while True: | |
| try: | |
| SessionManager.cleanup_old_sessions() | |
| time.sleep(60) # Check every minute | |
| except Exception as e: | |
| logger.error(f"Cleanup error: {e}") | |
| cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True) | |
| cleanup_thread.start() | |
| def process_pdf(file_obj, progress=gr.Progress()) -> Tuple[Optional[str], str, str]: | |
| """ | |
| Main processing function for Gradio interface | |
| Returns: (zip_file_path, statistics_html, status_message) | |
| """ | |
| if file_obj is None: | |
| return None, "", "β οΈ Please upload a PDF file" | |
| session_id = str(uuid.uuid4())[:8] | |
| session_dir = SessionManager.create_session(session_id) | |
| try: | |
| # Update progress | |
| progress(0.1, "Initializing...") | |
| # Save uploaded file | |
| input_path = session_dir / "input.pdf" | |
| # Handle both file path string and file object | |
| if isinstance(file_obj, str): | |
| shutil.copy(file_obj, input_path) | |
| else: | |
| with open(input_path, 'wb') as f: | |
| f.write(file_obj.read() if hasattr(file_obj, 'read') else file_obj) | |
| # Verify it's a valid PDF | |
| progress(0.2, "Verifying PDF...") | |
| with pikepdf.open(input_path) as pdf: | |
| page_count = len(pdf.pages) | |
| logger.info(f"Valid PDF with {page_count} pages") | |
| # Create output directory | |
| output_dir = session_dir / "output" | |
| output_dir.mkdir(exist_ok=True) | |
| # Split PDF with size constraints | |
| progress(0.3, "Splitting PDF into 4.5MB segments...") | |
| def update_progress(value, message): | |
| scaled_progress = 0.3 + (value * 0.5) | |
| progress(scaled_progress, message) | |
| output_files, stats = PDFProcessor.split_pdf_by_size( | |
| input_path, | |
| output_dir, | |
| progress_callback=update_progress | |
| ) | |
| if not output_files: | |
| return None, "", "β No valid segments created (all segments exceeded 5MB limit)" | |
| # Create ZIP file | |
| progress(0.9, "Creating ZIP archive...") | |
| zip_filename = f"pdf_segments_{session_id}.zip" | |
| zip_path = session_dir / zip_filename | |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
| for file_path in output_files: | |
| zipf.write(file_path, file_path.name) | |
| # Generate statistics with proper styling | |
| stats_html = f""" | |
| <div style="padding: 20px; background: #f0f9ff; border-radius: 10px; margin: 10px 0; border: 2px solid #0284c7;"> | |
| <h3 style="color: #0c4a6e; margin-top: 0;">π Processing Results</h3> | |
| <table style="width: 100%; border-collapse: collapse; background: white; border-radius: 5px;"> | |
| <tr style="border-bottom: 1px solid #e2e8f0;"> | |
| <td style="padding: 10px; font-weight: bold; color: #334155;">π Total Pages:</td> | |
| <td style="padding: 10px; text-align: right; color: #475569; font-weight: 600;">{stats['total_pages']}</td> | |
| </tr> | |
| <tr style="border-bottom: 1px solid #e2e8f0; background: #f8fafc;"> | |
| <td style="padding: 10px; font-weight: bold; color: #334155;">β Segments Created (β€5MB):</td> | |
| <td style="padding: 10px; text-align: right; color: #16a34a; font-weight: 600;">{stats['segments_created']}</td> | |
| </tr> | |
| <tr style="border-bottom: 1px solid #e2e8f0;"> | |
| <td style="padding: 10px; font-weight: bold; color: #334155;">β Segments Discarded (>5MB):</td> | |
| <td style="padding: 10px; text-align: right; color: #dc2626; font-weight: 600;">{stats['segments_discarded']}</td> | |
| </tr> | |
| </table> | |
| <p style="margin-top: 15px; color: #059669; font-weight: bold;"> | |
| β¨ Your file has been split successfully! Click the download button below. | |
| </p> | |
| </div> | |
| """ | |
| progress(1.0, "Complete! π") | |
| # Clean up input file to save space | |
| input_path.unlink() | |
| # IMPORTANT: Return the actual file path as a string for gr.File to handle | |
| return str(zip_path), stats_html, "β Processing complete! Your ZIP file is ready for download." | |
| except Exception as e: | |
| logger.error(f"Processing error: {str(e)}") | |
| # Cleanup on error | |
| try: | |
| shutil.rmtree(session_dir, ignore_errors=True) | |
| except: | |
| pass | |
| return None, "", f"β Error: {str(e)}" | |
| # Create Gradio interface with fixed download component | |
| with gr.Blocks( | |
| title="PDF Splitter - Fast & Simple", | |
| theme=gr.themes.Base() | |
| ) as app: | |
| gr.Markdown(""" | |
| # π PDF Splitter Tool | |
| **Split large PDFs into 4.5MB segments - Files over 5MB are automatically discarded!** | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| file_input = gr.File( | |
| label="Upload PDF", | |
| file_types=[".pdf"], | |
| type="filepath" | |
| ) | |
| split_btn = gr.Button( | |
| "π Split PDF into 4.5MB Segments", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| with gr.Row(): | |
| status_text = gr.Markdown("Ready to process your PDF...") | |
| with gr.Row(): | |
| stats_output = gr.HTML() | |
| with gr.Row(): | |
| # FIXED: Set interactive=True and use value parameter correctly | |
| download_file = gr.File( | |
| label="π¦ Download ZIP (Contains only segments β€5MB)", | |
| visible=True, | |
| interactive=False, # Keep false initially | |
| type="filepath" # Ensure it's set to filepath | |
| ) | |
| # Handle processing with proper output mapping | |
| def process_and_update(file_obj, progress=gr.Progress()): | |
| zip_path, stats_html, status_msg = process_pdf(file_obj, progress) | |
| # Return the file path directly for gr.File component | |
| return zip_path, stats_html, status_msg | |
| split_btn.click( | |
| fn=process_and_update, | |
| inputs=[file_input], | |
| outputs=[download_file, stats_output, status_text] | |
| ) | |
| # Create Gradio interface with fixed theme | |
| with gr.Blocks( | |
| title="PDF Splitter - Fast & Simple", | |
| theme=gr.themes.Base(), # Using Base theme for better control | |
| css=""" | |
| .gradio-container { | |
| max-width: 800px; | |
| margin: auto; | |
| } | |
| /* Fix text colors to ensure visibility */ | |
| .markdown-text, .markdown-text p, .markdown-text h1, .markdown-text h2, .markdown-text h3 { | |
| color: #1f2937 !important; | |
| } | |
| /* Ensure download button is styled properly */ | |
| .download-button, button[aria-label*="Download"] { | |
| background-color: #3b82f6 !important; | |
| color: white !important; | |
| cursor: pointer !important; | |
| } | |
| .download-button:hover, button[aria-label*="Download"]:hover { | |
| background-color: #2563eb !important; | |
| } | |
| /* Fix file component styling */ | |
| .file-preview { | |
| background-color: #f3f4f6 !important; | |
| border: 1px solid #d1d5db !important; | |
| } | |
| /* Ensure all text is visible */ | |
| label, .label-text { | |
| color: #374151 !important; | |
| } | |
| /* Status text visibility */ | |
| .status-text { | |
| color: #1f2937 !important; | |
| font-weight: 500; | |
| } | |
| """ | |
| ) as app: | |
| gr.Markdown(""" | |
| # π PDF Splitter Tool | |
| **Split large PDFs into 4.5MB segments - Files over 5MB are automatically discarded!** | |
| This tool uses advanced compression with qpdf to split your PDF into segments of approximately **4.5 MB** each. | |
| Any segments that exceed **5 MB** are automatically discarded to ensure all output files meet size requirements. | |
| ### How to use: | |
| 1. Upload your PDF file | |
| 2. Click "Split PDF" | |
| 3. Download the ZIP file containing only segments β€5MB | |
| *Note: Files are automatically deleted after 10 minutes for your privacy.* | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| file_input = gr.File( | |
| label="Upload PDF", | |
| file_types=[".pdf"], | |
| type="filepath", | |
| elem_classes="file-upload" | |
| ) | |
| split_btn = gr.Button( | |
| "π Split PDF into 4.5MB Segments", | |
| variant="primary", | |
| size="lg", | |
| elem_classes="split-button" | |
| ) | |
| with gr.Row(): | |
| status_text = gr.Markdown("Ready to process your PDF...", elem_classes="status-text") | |
| with gr.Row(): | |
| stats_output = gr.HTML(elem_classes="stats-output") | |
| with gr.Row(): | |
| download_file = gr.File( | |
| label="π¦ Download ZIP (Contains only segments β€5MB)", | |
| visible=True, | |
| elem_classes="download-section", | |
| interactive=False # Make it non-interactive until file is ready | |
| ) | |
| # Handle processing | |
| split_btn.click( | |
| fn=process_pdf, | |
| inputs=[file_input], | |
| outputs=[download_file, stats_output, status_text] | |
| ) | |
| # Add features with proper styling | |
| gr.Markdown(""" | |
| --- | |
| ### π‘ Key Features: | |
| - β **Target segment size: 4.5MB** - Optimized for most systems | |
| - β **Maximum allowed size: 5MB** - Segments over 5MB are automatically discarded | |
| - β **Smart splitting** - Adjusts page count per segment dynamically | |
| - β **Compressed output** - Uses qpdf for efficient PDF compression | |
| - β **Automatic cleanup** - Files deleted after 10 minutes | |
| - β **Progress tracking** - Real-time updates during processing | |
| ### π Privacy & Security: | |
| - All uploaded files are automatically deleted after processing | |
| - No files are stored permanently on the server | |
| - Each user gets a unique session ID for file isolation | |
| ### βοΈ Technical Details: | |
| - Uses **pikepdf** (qpdf wrapper) for efficient PDF manipulation | |
| - Maintains PDF compression without decompressing | |
| - Dynamically adjusts segment size based on page content | |
| - Automatically retries with fewer pages if segment exceeds limits | |
| """, elem_classes="features-section") | |
| # Launch the app | |
| if __name__ == "__main__": | |
| app.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| show_error=True | |
| ) |