Spaces:
Paused
Paused
| from flask import Flask, request, send_file, jsonify | |
| from flask_cors import CORS | |
| import fitz # PyMuPDF | |
| import os | |
| import tempfile | |
| import shutil | |
| import io | |
| app = Flask(__name__) | |
| CORS(app) # This will allow all origins | |
| def compress_pdf(input_path, output_path, quality=70, compression_level=4): | |
| """ | |
| Compresses a PDF using PyMuPDF with enhanced compression strategies. | |
| Args: | |
| input_path: Path to input PDF | |
| output_path: Path to save compressed PDF | |
| quality: Image quality (0-100), lower means more compression | |
| compression_level: PDF compression level (1-4) | |
| Returns: | |
| True if compression was successful, False otherwise | |
| """ | |
| try: | |
| doc = fitz.open(input_path) | |
| # Check if PDF has images that can be recompressed | |
| has_images = False | |
| for page_num in range(doc.page_count): | |
| page = doc[page_num] | |
| image_list = page.get_images(full=True) | |
| if image_list: | |
| has_images = True | |
| break | |
| # If PDF has images, apply image recompression | |
| if has_images: | |
| for page_num in range(doc.page_count): | |
| page = doc[page_num] | |
| image_list = page.get_images(full=True) | |
| for img_index, img in enumerate(image_list): | |
| xref = img[0] | |
| try: | |
| # Get the image data | |
| base_image = doc.extract_image(xref) | |
| image_bytes = base_image["image"] | |
| # Replace with lower quality if it's JPEG | |
| if base_image["ext"] == "jpeg": | |
| # Create a more compressed version of the image | |
| # For PyMuPDF 1.20.0+, use this approach: | |
| pix = fitz.Pixmap(image_bytes) | |
| if pix.colorspace.n > 3: # CMYK or other colorspace | |
| pix = fitz.Pixmap(fitz.csRGB, pix) # convert to RGB | |
| # Compress image with reduced quality | |
| new_bytes = pix.tobytes(output="jpeg", jpg_quality=quality) | |
| # Replace the image in the PDF if the new one is smaller | |
| if len(new_bytes) < len(image_bytes): | |
| doc._deleteObject(xref) | |
| doc._setObject(xref, new_bytes, compress=True) | |
| except Exception as e: | |
| print(f"Error processing image {img_index} on page {page_num}: {str(e)}") | |
| # Continue with the next image | |
| continue | |
| # Apply standard PDF compression options | |
| doc.save( | |
| output_path, | |
| garbage=compression_level, # 4 is max garbage collection | |
| deflate=True, # Use deflate compression for streams | |
| clean=True, # Clean document structure | |
| pretty=False, # Disable pretty printing for smaller size | |
| linear=True # Create a linearized PDF | |
| ) | |
| doc.close() | |
| return True | |
| except Exception as e: | |
| print(f"Error compressing PDF: {str(e)}") | |
| return False | |
| def get_progressive_compression_settings(original_size_kb, target_kb): | |
| """ | |
| Determine compression settings based on the gap between original and target size. | |
| Returns a list of (quality, compression_level) tuples to try in sequence. | |
| """ | |
| # If target is very aggressive (less than 25% of original) | |
| if target_kb < original_size_kb * 0.25: | |
| return [ | |
| (40, 4), # Very aggressive compression | |
| (30, 4), # Ultra aggressive compression | |
| (20, 4), # Extreme compression - might affect readability | |
| (10, 4) # Last resort - significant quality loss | |
| ] | |
| # If target is aggressive (less than 50% of original) | |
| elif target_kb < original_size_kb * 0.5: | |
| return [ | |
| (60, 4), | |
| (50, 4), | |
| (40, 4), | |
| (30, 4) | |
| ] | |
| # Moderate compression needed | |
| else: | |
| return [ | |
| (80, 4), | |
| (70, 4), | |
| (60, 4), | |
| (50, 4) | |
| ] | |
| def home(): | |
| return "PDF Compressor API is running" | |
| def compress(): | |
| # Track temporary directories to clean up | |
| temp_dirs = [] | |
| try: | |
| if 'pdf' not in request.files: | |
| return jsonify({"error": "No PDF file uploaded."}), 400 | |
| target_size_str = request.form.get("target_size") | |
| if not target_size_str: | |
| return jsonify({"error": "Target size not specified."}), 400 | |
| # Parse target size | |
| try: | |
| if target_size_str.lower().endswith("mb"): | |
| target_kb = float(target_size_str[:-2]) * 1024 | |
| elif target_size_str.lower().endswith("kb"): | |
| target_kb = float(target_size_str[:-2]) | |
| else: | |
| target_kb = float(target_size_str) | |
| except ValueError: | |
| return jsonify({"error": "Invalid target size format."}), 400 | |
| # Create a temporary directory | |
| temp_dir = tempfile.mkdtemp() | |
| temp_dirs.append(temp_dir) | |
| # Process PDF | |
| pdf_file = request.files['pdf'] | |
| input_path = os.path.join(temp_dir, "input.pdf") | |
| output_path = os.path.join(temp_dir, "compressed.pdf") | |
| pdf_file.save(input_path) | |
| original_size_kb = os.path.getsize(input_path) / 1024 | |
| print(f"Original size: {original_size_kb:.2f} KB, Target: {target_kb:.2f} KB") | |
| # Check if PDF is already smaller than target | |
| if original_size_kb <= target_kb: | |
| shutil.copy(input_path, output_path) | |
| print("Original file already meets target size") | |
| final_size_kb = original_size_kb | |
| else: | |
| # Get progressive compression settings based on size gap | |
| compression_settings = get_progressive_compression_settings(original_size_kb, target_kb) | |
| # Try increasingly aggressive compression until target is met or we run out of options | |
| best_size_kb = original_size_kb | |
| best_output_path = input_path | |
| for quality, compression_level in compression_settings: | |
| temp_output = os.path.join(temp_dir, f"temp_q{quality}_c{compression_level}.pdf") | |
| print(f"Trying compression with quality={quality}, level={compression_level}") | |
| compress_pdf(input_path, temp_output, quality, compression_level) | |
| current_size_kb = os.path.getsize(temp_output) / 1024 | |
| print(f"Result: {current_size_kb:.2f} KB") | |
| # Keep the smallest file that's been generated | |
| if current_size_kb < best_size_kb: | |
| best_size_kb = current_size_kb | |
| best_output_path = temp_output | |
| # If we've reached target, stop trying | |
| if current_size_kb <= target_kb: | |
| print(f"Target reached with quality={quality}, level={compression_level}") | |
| break | |
| # Copy the best result to the output path | |
| shutil.copy(best_output_path, output_path) | |
| final_size_kb = best_size_kb | |
| # If best compression result is larger than original, use original | |
| if final_size_kb > original_size_kb: | |
| print("Compression ineffective, using original file") | |
| shutil.copy(input_path, output_path) | |
| final_size_kb = original_size_kb | |
| # Get final metrics | |
| compression_ratio = 100 * (1 - final_size_kb / original_size_kb) | |
| # If final size is too large, inform but still provide the file | |
| if final_size_kb > target_kb: | |
| # Important! Check Accept header to determine what client expects | |
| accepts = request.headers.get('Accept', '') | |
| # If client expects JSON, send the warning JSON | |
| if 'application/json' in accepts or '*/*' in accepts: | |
| compression_analysis = "" | |
| if compression_ratio <= 0: | |
| compression_analysis = ( | |
| "Your PDF may already be highly optimized or contain mostly " | |
| "vector graphics/text which don't compress well. " | |
| "Consider a higher target size." | |
| ) | |
| elif compression_ratio < 10: | |
| compression_analysis = ( | |
| "Limited compression achieved. This PDF may contain pre-compressed " | |
| "images or be mostly text/vector content. Consider a higher target size." | |
| ) | |
| return jsonify({ | |
| "warning": f"Unable to compress below {target_kb:.2f} KB. Best compressed size is {int(final_size_kb)} KB.", | |
| "original_size_kb": round(original_size_kb, 2), | |
| "compressed_size_kb": round(final_size_kb, 2), | |
| "target_size_kb": round(target_kb, 2), | |
| "compression_ratio": round(compression_ratio, 2), | |
| "technical_details": compression_analysis, | |
| "download_available": True | |
| }), 200 | |
| # Return the compressed file | |
| response = send_file(output_path, as_attachment=True, download_name="compressed.pdf") | |
| # Add cleanup function to be called after response is sent | |
| def cleanup(): | |
| for dir_path in temp_dirs: | |
| try: | |
| if os.path.exists(dir_path): | |
| shutil.rmtree(dir_path) | |
| print(f"Cleaned up temporary directory: {dir_path}") | |
| except Exception as e: | |
| print(f"Error cleaning up {dir_path}: {str(e)}") | |
| return response | |
| except Exception as e: | |
| # Ensure cleanup in case of errors | |
| for dir_path in temp_dirs: | |
| try: | |
| if os.path.exists(dir_path): | |
| shutil.rmtree(dir_path) | |
| except: | |
| pass # Best effort cleanup | |
| print(f"Error in compression endpoint: {str(e)}") | |
| return jsonify({"error": f"An error occurred: {str(e)}"}), 500 | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860) |