Spaces:
Running
Running
| import gradio as gr | |
| import pikepdf | |
| import os | |
| import zipfile | |
| import shutil | |
| from pathlib import Path | |
| import uuid | |
| from datetime import datetime, timedelta | |
| import logging | |
| import threading | |
| import time | |
| from typing import Tuple, List, Optional | |
| import subprocess | |
| import json | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Configuration - FIXED VALUES | |
| TARGET_SEGMENT_SIZE_MB = 4.5 # Target size for each segment | |
| MAX_ALLOWED_SIZE_MB = 5.0 # Maximum allowed size - discard if larger | |
| TARGET_SEGMENT_SIZE_BYTES = int(TARGET_SEGMENT_SIZE_MB * 1024 * 1024) # 4.5MB in bytes | |
| MAX_ALLOWED_SIZE_BYTES = int(MAX_ALLOWED_SIZE_MB * 1024 * 1024) # 5MB in bytes | |
| TEMP_DIR = Path("temp_files") | |
| CLEANUP_AFTER_MINUTES = 10 | |
| # Create temp directory | |
| TEMP_DIR.mkdir(exist_ok=True) | |
| # Store user sessions for cleanup | |
| user_sessions = {} | |
| class PDFProcessor: | |
| """Handle PDF splitting using qpdf directly for performance""" | |
| def get_pdf_info(pdf_path: Path) -> dict: | |
| """Get PDF info using qpdf""" | |
| try: | |
| result = subprocess.run( | |
| ["qpdf", "--show-npages", str(pdf_path)], | |
| capture_output=True, | |
| text=True, | |
| check=True | |
| ) | |
| return {"total_pages": int(result.stdout.strip())} | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"Error getting PDF info: {e}") | |
| raise | |
| def split_pdf_by_size(input_path: Path, output_dir: Path, progress_callback=None) -> Tuple[List[Path], dict]: | |
| """ | |
| Split PDF using qpdf directly (like your bash script) for much better performance | |
| """ | |
| kept_files = [] | |
| stats = { | |
| "total_pages": 0, | |
| "segments_created": 0, | |
| "segments_discarded": 0, | |
| "original_size_mb": input_path.stat().st_size / 1024 / 1024, | |
| "total_output_size_mb": 0, | |
| "largest_segment_mb": 0, | |
| "smallest_segment_mb": float('inf') | |
| } | |
| try: | |
| # Get total pages using qpdf | |
| pdf_info = PDFProcessor.get_pdf_info(input_path) | |
| total_pages = pdf_info["total_pages"] | |
| stats["total_pages"] = total_pages | |
| if total_pages == 0: | |
| return kept_files, stats | |
| logger.info(f"Starting split: {total_pages} pages, original size: {stats['original_size_mb']:.2f} MB") | |
| start_page = 1 # qpdf uses 1-based indexing | |
| part = 1 | |
| while start_page <= total_pages: | |
| if progress_callback: | |
| progress = ((start_page - 1) / total_pages) | |
| progress_callback(progress, f"Processing segment {part}...") | |
| # Binary search for the right number of pages | |
| low = start_page | |
| high = min(start_page + 100, total_pages) # Start with max 100 pages | |
| best_end = start_page | |
| best_size = 0 | |
| # First, quickly find a rough upper bound | |
| test_file = output_dir / f"test_{part}.pdf" | |
| while low <= high: | |
| mid = (low + high) // 2 | |
| # Create test segment using qpdf | |
| try: | |
| subprocess.run( | |
| ["qpdf", "--empty", "--pages", str(input_path), f"{start_page}-{mid}", "--", str(test_file)], | |
| capture_output=True, | |
| check=True, | |
| timeout=10 # 10 second timeout | |
| ) | |
| # Check file size | |
| if test_file.exists(): | |
| size = test_file.stat().st_size | |
| if size <= MAX_ALLOWED_SIZE_BYTES: | |
| best_end = mid | |
| best_size = size | |
| if size < TARGET_SEGMENT_SIZE_BYTES * 0.9: # Less than 90% of target | |
| low = mid + 1 # Try more pages | |
| else: | |
| break # Good enough, close to target | |
| else: | |
| high = mid - 1 # Too big, try fewer pages | |
| # Clean up test file | |
| test_file.unlink() | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"qpdf error: {e}") | |
| if test_file.exists(): | |
| test_file.unlink() | |
| high = mid - 1 | |
| except subprocess.TimeoutExpired: | |
| logger.error(f"qpdf timeout for pages {start_page}-{mid}") | |
| if test_file.exists(): | |
| test_file.unlink() | |
| high = mid - 1 | |
| # Create final segment with best found size | |
| if best_end >= start_page: | |
| final_filename = f"segment_{part:03d}_p{start_page}-{best_end}.pdf" | |
| final_path = output_dir / final_filename | |
| try: | |
| # Create final segment | |
| subprocess.run( | |
| ["qpdf", "--empty", "--pages", str(input_path), f"{start_page}-{best_end}", "--", | |
| str(final_path), "--compress-streams=y", "--object-streams=generate"], | |
| capture_output=True, | |
| check=True, | |
| timeout=30 | |
| ) | |
| if final_path.exists(): | |
| final_size = final_path.stat().st_size | |
| final_size_mb = final_size / 1024 / 1024 | |
| if final_size <= MAX_ALLOWED_SIZE_BYTES: | |
| kept_files.append(final_path) | |
| stats["segments_created"] += 1 | |
| stats["total_output_size_mb"] += final_size_mb | |
| stats["largest_segment_mb"] = max(stats["largest_segment_mb"], final_size_mb) | |
| stats["smallest_segment_mb"] = min(stats["smallest_segment_mb"], final_size_mb) | |
| logger.info(f"Created segment {part}: {final_size_mb:.2f} MB (pages {start_page}-{best_end})") | |
| else: | |
| # Single page over 5MB | |
| final_path.unlink() | |
| stats["segments_discarded"] += 1 | |
| logger.warning(f"Segment {part} exceeded 5MB limit - discarded") | |
| except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e: | |
| logger.error(f"Error creating final segment: {e}") | |
| if final_path.exists(): | |
| final_path.unlink() | |
| start_page = best_end + 1 | |
| else: | |
| # Single page is too large, skip it | |
| logger.warning(f"Page {start_page} exceeds size limit - skipping") | |
| stats["segments_discarded"] += 1 | |
| start_page += 1 | |
| part += 1 | |
| if stats["smallest_segment_mb"] == float('inf'): | |
| stats["smallest_segment_mb"] = 0 | |
| if progress_callback: | |
| progress_callback(1.0, "Splitting complete!") | |
| logger.info(f"Completed: {stats['segments_created']} segments created, {stats['segments_discarded']} discarded") | |
| except Exception as e: | |
| logger.error(f"Error in split_pdf_by_size: {str(e)}") | |
| raise | |
| return kept_files, stats | |
| class SessionManager: | |
| """Manage user sessions and cleanup""" | |
| def create_session(session_id: str) -> Path: | |
| """Create a new user session directory""" | |
| session_dir = TEMP_DIR / session_id | |
| session_dir.mkdir(exist_ok=True) | |
| user_sessions[session_id] = { | |
| "created": datetime.now(), | |
| "dir": session_dir | |
| } | |
| return session_dir | |
| def cleanup_old_sessions(): | |
| """Remove old session directories""" | |
| current_time = datetime.now() | |
| sessions_to_remove = [] | |
| for session_id, session_info in user_sessions.items(): | |
| if current_time - session_info["created"] > timedelta(minutes=CLEANUP_AFTER_MINUTES): | |
| try: | |
| shutil.rmtree(session_info["dir"], ignore_errors=True) | |
| sessions_to_remove.append(session_id) | |
| logger.info(f"Cleaned up session: {session_id}") | |
| except Exception as e: | |
| logger.error(f"Error cleaning session {session_id}: {e}") | |
| for session_id in sessions_to_remove: | |
| del user_sessions[session_id] | |
| # Start cleanup thread | |
| def cleanup_worker(): | |
| """Background thread for cleaning old files""" | |
| while True: | |
| try: | |
| SessionManager.cleanup_old_sessions() | |
| time.sleep(60) # Check every minute | |
| except Exception as e: | |
| logger.error(f"Cleanup error: {e}") | |
| cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True) | |
| cleanup_thread.start() | |
| def process_pdf(file_obj, progress=gr.Progress()) -> Tuple[Optional[str], str, str]: | |
| """ | |
| Main processing function for Gradio interface | |
| Returns: (zip_file_path, statistics_html, status_message) | |
| """ | |
| if file_obj is None: | |
| return None, "", "β οΈ Please upload a PDF file" | |
| session_id = str(uuid.uuid4())[:8] | |
| session_dir = SessionManager.create_session(session_id) | |
| try: | |
| # Update progress | |
| progress(0.1, "Initializing...") | |
| # Save uploaded file | |
| input_path = session_dir / "input.pdf" | |
| # Handle both file path string and file object | |
| if isinstance(file_obj, str): | |
| shutil.copy(file_obj, input_path) | |
| else: | |
| with open(input_path, 'wb') as f: | |
| f.write(file_obj.read() if hasattr(file_obj, 'read') else file_obj) | |
| # Verify it's a valid PDF | |
| progress(0.2, "Verifying PDF...") | |
| with pikepdf.open(input_path) as pdf: | |
| page_count = len(pdf.pages) | |
| logger.info(f"Valid PDF with {page_count} pages") | |
| # Create output directory | |
| output_dir = session_dir / "output" | |
| output_dir.mkdir(exist_ok=True) | |
| # Split PDF with size constraints | |
| progress(0.3, "Splitting PDF into 4.5MB segments...") | |
| def update_progress(value, message): | |
| scaled_progress = 0.3 + (value * 0.5) | |
| progress(scaled_progress, message) | |
| output_files, stats = PDFProcessor.split_pdf_by_size( | |
| input_path, | |
| output_dir, | |
| progress_callback=update_progress | |
| ) | |
| if not output_files: | |
| return None, "", "β No valid segments created (all segments exceeded 5MB limit)" | |
| # Create ZIP file | |
| progress(0.9, "Creating ZIP archive...") | |
| zip_filename = f"pdf_segments_{session_id}.zip" | |
| zip_path = session_dir / zip_filename | |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
| for file_path in output_files: | |
| zipf.write(file_path, file_path.name) | |
| # CRITICAL: Verify the ZIP file was created | |
| if not zip_path.exists(): | |
| raise Exception("ZIP file creation failed") | |
| # Log for debugging | |
| logger.info(f"ZIP file created at: {zip_path}") | |
| logger.info(f"ZIP file size: {zip_path.stat().st_size / 1024 / 1024:.2f} MB") | |
| # Generate statistics with proper styling | |
| stats_html = f""" | |
| <div style="padding: 20px; background: #f0f9ff; border-radius: 10px; margin: 10px 0; border: 2px solid #0284c7;"> | |
| <h3 style="color: #0c4a6e; margin-top: 0;">π Processing Results</h3> | |
| <table style="width: 100%; border-collapse: collapse; background: white; border-radius: 5px;"> | |
| <tr style="border-bottom: 1px solid #e2e8f0;"> | |
| <td style="padding: 10px; font-weight: bold; color: #334155;">π Total Pages:</td> | |
| <td style="padding: 10px; text-align: right; color: #475569; font-weight: 600;">{stats['total_pages']}</td> | |
| </tr> | |
| <tr style="border-bottom: 1px solid #e2e8f0; background: #f8fafc;"> | |
| <td style="padding: 10px; font-weight: bold; color: #334155;">β Segments Created (β€5MB):</td> | |
| <td style="padding: 10px; text-align: right; color: #16a34a; font-weight: 600;">{stats['segments_created']}</td> | |
| </tr> | |
| <tr style="border-bottom: 1px solid #e2e8f0;"> | |
| <td style="padding: 10px; font-weight: bold; color: #334155;">β Segments Discarded (>5MB):</td> | |
| <td style="padding: 10px; text-align: right; color: #dc2626; font-weight: 600;">{stats['segments_discarded']}</td> | |
| </tr> | |
| </table> | |
| <p style="margin-top: 15px; color: #059669; font-weight: bold;"> | |
| β¨ Your file has been split successfully! Click the download button below. | |
| </p> | |
| </div> | |
| """ | |
| progress(1.0, "Complete! π") | |
| # Clean up input file to save space | |
| input_path.unlink() | |
| # IMPORTANT: Return the actual file path as a string for gr.File to handle | |
| return str(zip_path), stats_html, "β Processing complete! Your ZIP file is ready for download." | |
| except Exception as e: | |
| logger.error(f"Processing error: {str(e)}") | |
| # Cleanup on error | |
| try: | |
| shutil.rmtree(session_dir, ignore_errors=True) | |
| except: | |
| pass | |
| return None, "", f"β Error: {str(e)}" | |
| # Create Gradio interface with fixed download functionality | |
| with gr.Blocks( | |
| title="PDF Splitter - Fast & Simple", | |
| theme=gr.themes.Base() | |
| ) as app: | |
| gr.Markdown(""" | |
| # π PDF Splitter Tool | |
| **Split large PDFs into 4.5MB segments - Files over 5MB are automatically discarded!** | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| file_input = gr.File( | |
| label="Upload PDF", | |
| file_types=[".pdf"], | |
| type="filepath" | |
| ) | |
| split_btn = gr.Button( | |
| "π Split PDF into 4.5MB Segments", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| with gr.Row(): | |
| status_text = gr.Markdown("Ready to process your PDF...") | |
| with gr.Row(): | |
| stats_output = gr.HTML() | |
| with gr.Row(): | |
| # CRITICAL FIX: Use gr.File with proper configuration | |
| download_file = gr.File( | |
| label="π¦ Download ZIP (Contains only segments β€5MB)", | |
| visible=True, | |
| type="filepath", # Must be filepath for output | |
| interactive=False, # Non-interactive for output | |
| elem_id="download_output" # Add ID for debugging | |
| ) | |
| # Fixed processing function that ensures file path is returned correctly | |
| def process_pdf_wrapper(file_obj, progress=gr.Progress()): | |
| """Wrapper to ensure proper file path return""" | |
| if file_obj is None: | |
| return None, "", "β οΈ Please upload a PDF file" | |
| try: | |
| # Call your existing process_pdf function | |
| zip_path, stats_html, status_msg = process_pdf(file_obj, progress) | |
| # CRITICAL: Ensure zip_path is a string and file exists | |
| if zip_path and Path(zip_path).exists(): | |
| # Return the string path directly | |
| return str(zip_path), stats_html, status_msg | |
| else: | |
| return None, stats_html, "β Error: ZIP file was not created" | |
| except Exception as e: | |
| logger.error(f"Process wrapper error: {str(e)}") | |
| return None, "", f"β Error: {str(e)}" | |
| # Connect the button with proper output mapping | |
| split_btn.click( | |
| fn=process_pdf_wrapper, | |
| inputs=[file_input], | |
| outputs=[download_file, stats_output, status_text], | |
| show_progress=True | |
| ) | |
| # Create Gradio interface with fixed theme | |
| with gr.Blocks( | |
| title="PDF Splitter - Fast & Simple", | |
| theme=gr.themes.Base(), # Using Base theme for better control | |
| css=""" | |
| .gradio-container { | |
| max-width: 800px; | |
| margin: auto; | |
| } | |
| /* Fix text colors to ensure visibility */ | |
| .markdown-text, .markdown-text p, .markdown-text h1, .markdown-text h2, .markdown-text h3 { | |
| color: #1f2937 !important; | |
| } | |
| /* Ensure download button is styled properly */ | |
| .download-button, button[aria-label*="Download"] { | |
| background-color: #3b82f6 !important; | |
| color: white !important; | |
| cursor: pointer !important; | |
| } | |
| .download-button:hover, button[aria-label*="Download"]:hover { | |
| background-color: #2563eb !important; | |
| } | |
| /* Fix file component styling */ | |
| .file-preview { | |
| background-color: #f3f4f6 !important; | |
| border: 1px solid #d1d5db !important; | |
| } | |
| /* Ensure all text is visible */ | |
| label, .label-text { | |
| color: #374151 !important; | |
| } | |
| /* Status text visibility */ | |
| .status-text { | |
| color: #1f2937 !important; | |
| font-weight: 500; | |
| } | |
| """ | |
| ) as app: | |
| gr.Markdown(""" | |
| # π PDF Splitter Tool | |
| **Split large PDFs into 4.5MB segments - Files over 5MB are automatically discarded!** | |
| This tool uses advanced compression with qpdf to split your PDF into segments of approximately **4.5 MB** each. | |
| Any segments that exceed **5 MB** are automatically discarded to ensure all output files meet size requirements. | |
| ### How to use: | |
| 1. Upload your PDF file | |
| 2. Click "Split PDF" | |
| 3. Download the ZIP file containing only segments β€5MB | |
| *Note: Files are automatically deleted after 10 minutes for your privacy.* | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| file_input = gr.File( | |
| label="Upload PDF", | |
| file_types=[".pdf"], | |
| type="filepath", | |
| elem_classes="file-upload" | |
| ) | |
| split_btn = gr.Button( | |
| "π Split PDF into 4.5MB Segments", | |
| variant="primary", | |
| size="lg", | |
| elem_classes="split-button" | |
| ) | |
| with gr.Row(): | |
| status_text = gr.Markdown("Ready to process your PDF...", elem_classes="status-text") | |
| with gr.Row(): | |
| stats_output = gr.HTML(elem_classes="stats-output") | |
| with gr.Row(): | |
| download_file = gr.File( | |
| label="π¦ Download ZIP (Contains only segments β€5MB)", | |
| visible=True, | |
| elem_classes="download-section", | |
| interactive=False # Make it non-interactive until file is ready | |
| ) | |
| # Handle processing | |
| split_btn.click( | |
| fn=process_pdf, | |
| inputs=[file_input], | |
| outputs=[download_file, stats_output, status_text] | |
| ) | |
| # Add features with proper styling | |
| gr.Markdown(""" | |
| --- | |
| ### π‘ Key Features: | |
| - β **Target segment size: 4.5MB** - Optimized for most systems | |
| - β **Maximum allowed size: 5MB** - Segments over 5MB are automatically discarded | |
| - β **Smart splitting** - Adjusts page count per segment dynamically | |
| - β **Compressed output** - Uses qpdf for efficient PDF compression | |
| - β **Automatic cleanup** - Files deleted after 10 minutes | |
| - β **Progress tracking** - Real-time updates during processing | |
| ### π Privacy & Security: | |
| - All uploaded files are automatically deleted after processing | |
| - No files are stored permanently on the server | |
| - Each user gets a unique session ID for file isolation | |
| ### βοΈ Technical Details: | |
| - Uses **pikepdf** (qpdf wrapper) for efficient PDF manipulation | |
| - Maintains PDF compression without decompressing | |
| - Dynamically adjusts segment size based on page content | |
| - Automatically retries with fewer pages if segment exceeds limits | |
| """, elem_classes="features-section") | |
| # Launch the app | |
| if __name__ == "__main__": | |
| app.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| show_error=True | |
| ) |