Spaces:
Paused
Paused
| import dash | |
| from dash import dcc, html, Input, Output, State, callback | |
| import dash_bootstrap_components as dbc | |
| from dash.exceptions import PreventUpdate | |
| import pikepdf | |
| import requests | |
| import io | |
| import tempfile | |
| import os | |
| import base64 | |
| import threading | |
| app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP]) | |
| app.layout = dbc.Container([ | |
| html.H1("PDF Compressor", className="my-4"), | |
| dbc.Card([ | |
| dbc.CardBody([ | |
| dcc.Upload( | |
| id='upload-pdf', | |
| children=html.Div([ | |
| 'Drag and Drop or ', | |
| html.A('Select PDF File') | |
| ]), | |
| style={ | |
| 'width': '100%', | |
| 'height': '60px', | |
| 'lineHeight': '60px', | |
| 'borderWidth': '1px', | |
| 'borderStyle': 'dashed', | |
| 'borderRadius': '5px', | |
| 'textAlign': 'center', | |
| 'margin': '10px' | |
| }, | |
| multiple=False | |
| ), | |
| dbc.Alert(id="upload-status", is_open=False, duration=4000, className="mt-3"), | |
| dbc.Input(id="url-input", placeholder="Or enter PDF URL", type="text", className="mt-3"), | |
| dbc.Button("Compress PDF", id="compress-btn", color="primary", className="mt-3"), | |
| dbc.Spinner( | |
| html.Div(id="compression-status", className="mt-3"), | |
| color="primary", | |
| type="border", | |
| fullscreen=True, | |
| fullscreen_style={"backgroundColor": "rgba(0, 0, 0, 0.3)"}, | |
| ), | |
| dcc.Download(id="download-pdf"), | |
| ]) | |
| ]), | |
| ]) | |
| def compress_pdf(input_file, url): | |
| if input_file is None and (url is None or url.strip() == ""): | |
| return None, "Please provide either a file or a URL." | |
| if input_file is not None and url and url.strip() != "": | |
| return None, "Please provide either a file or a URL, not both." | |
| try: | |
| if url and url.strip() != "": | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| pdf_content = io.BytesIO(response.content) | |
| initial_size = len(response.content) | |
| else: | |
| content_type, content_string = input_file.split(',') | |
| decoded = base64.b64decode(content_string) | |
| pdf_content = io.BytesIO(decoded) | |
| initial_size = len(decoded) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as temp_file: | |
| temp_file_path = temp_file.name | |
| pdf = pikepdf.Pdf.open(pdf_content) | |
| compression_params = dict(compress_streams=True, object_stream_mode=pikepdf.ObjectStreamMode.generate) | |
| pdf.save(temp_file_path, **compression_params) | |
| compressed_size = os.path.getsize(temp_file_path) | |
| compression_ratio = compressed_size / initial_size | |
| compression_percentage = (1 - compression_ratio) * 100 | |
| if compression_ratio >= 1 or compression_percentage < 5: | |
| os.remove(temp_file_path) | |
| return None, f"Unable to compress the PDF effectively. Original file returned. (Attempted compression: {compression_percentage:.2f}%)" | |
| return temp_file_path, f"PDF compressed successfully! Compression achieved: {compression_percentage:.2f}%" | |
| except Exception as e: | |
| return None, f"Error compressing PDF: {str(e)}" | |
| def update_upload_status(filename, contents): | |
| if filename is not None and contents is not None: | |
| return f"File uploaded: {filename}", True, "success" | |
| return "", False, "primary" | |
| def process_compress_and_download(n_clicks, file_content, url): | |
| if file_content is None and (url is None or url.strip() == ""): | |
| return "Please provide either a file or a URL.", None | |
| def compression_thread(): | |
| nonlocal file_content, url | |
| output_file, message = compress_pdf(file_content, url) | |
| if output_file: | |
| with open(output_file, "rb") as file: | |
| compressed_content = file.read() | |
| os.remove(output_file) | |
| return message, dcc.send_bytes(compressed_content, "compressed.pdf") | |
| else: | |
| return message, None | |
| thread = threading.Thread(target=compression_thread) | |
| thread.start() | |
| thread.join() | |
| return compression_thread() | |
| if __name__ == '__main__': | |
| print("Starting the Dash application...") | |
| app.run(debug=True, host='0.0.0.0', port=7860) | |
| print("Dash application has finished running.") |