|
|
import gradio as gr |
|
|
import os |
|
|
import zipfile |
|
|
import tempfile |
|
|
import shutil |
|
|
from pathlib import Path |
|
|
import mimetypes |
|
|
import time |
|
|
import socket |
|
|
|
|
|
|
|
|
UPLOADS_FOLDER = "uploads" |
|
|
os.makedirs(UPLOADS_FOLDER, exist_ok=True) |
|
|
|
|
|
def find_available_port(start_port=7860, max_attempts=10): |
|
|
"""Find an available port starting from start_port""" |
|
|
for port in range(start_port, start_port + max_attempts): |
|
|
try: |
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: |
|
|
s.bind(('', port)) |
|
|
return port |
|
|
except OSError: |
|
|
continue |
|
|
return start_port |
|
|
|
|
|
def process_file(file_obj, file_name, action): |
|
|
""" |
|
|
Process uploaded file: return original or create zip |
|
|
""" |
|
|
if not file_obj: |
|
|
return None, "❌ No file uploaded" |
|
|
|
|
|
try: |
|
|
|
|
|
if isinstance(file_obj, dict): |
|
|
file_path = file_obj["path"] |
|
|
original_name = file_obj["name"] |
|
|
else: |
|
|
|
|
|
file_path = file_obj.name if hasattr(file_obj, 'name') else file_obj |
|
|
original_name = os.path.basename(file_path) |
|
|
|
|
|
|
|
|
temp_dir = tempfile.mkdtemp() |
|
|
custom_name = file_name.strip() if file_name and file_name.strip() else original_name |
|
|
original_path = os.path.join(temp_dir, custom_name) |
|
|
|
|
|
|
|
|
shutil.copy2(file_path, original_path) |
|
|
|
|
|
if action == "original": |
|
|
|
|
|
return { |
|
|
"path": original_path, |
|
|
"name": custom_name |
|
|
}, f"✅ Ready to download: {custom_name}" |
|
|
|
|
|
elif action == "zip": |
|
|
|
|
|
base_name = os.path.splitext(custom_name)[0] |
|
|
zip_name = f"{base_name}.zip" |
|
|
zip_path = os.path.join(temp_dir, zip_name) |
|
|
|
|
|
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: |
|
|
zipf.write(original_path, custom_name) |
|
|
|
|
|
return { |
|
|
"path": zip_path, |
|
|
"name": zip_name |
|
|
}, f"✅ Zipped as: {zip_name}" |
|
|
|
|
|
elif action == "zip_with_password": |
|
|
|
|
|
try: |
|
|
import pyminizip |
|
|
base_name = os.path.splitext(custom_name)[0] |
|
|
zip_name = f"{base_name}_protected.zip" |
|
|
zip_path = os.path.join(temp_dir, zip_name) |
|
|
|
|
|
|
|
|
pyminizip.compress( |
|
|
original_path, |
|
|
None, |
|
|
zip_path, |
|
|
"password123", |
|
|
5 |
|
|
) |
|
|
return { |
|
|
"path": zip_path, |
|
|
"name": zip_name |
|
|
}, f"✅ Password-protected zip created (password: password123)" |
|
|
|
|
|
except ImportError: |
|
|
return None, "❌ pyminizip not installed. Install with: pip install pyminizip" |
|
|
|
|
|
else: |
|
|
return None, "❌ Invalid action selected" |
|
|
|
|
|
except Exception as e: |
|
|
return None, f"❌ Error: {str(e)}" |
|
|
|
|
|
def process_multiple_files(files, action): |
|
|
""" |
|
|
Process multiple uploaded files |
|
|
""" |
|
|
if not files: |
|
|
return None, "❌ No files uploaded" |
|
|
|
|
|
try: |
|
|
temp_dir = tempfile.mkdtemp() |
|
|
|
|
|
if action == "individual": |
|
|
|
|
|
zip_name = "files.zip" |
|
|
zip_path = os.path.join(temp_dir, zip_name) |
|
|
|
|
|
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: |
|
|
for file_obj in files: |
|
|
|
|
|
if isinstance(file_obj, dict): |
|
|
file_path = file_obj["path"] |
|
|
file_name = file_obj["name"] |
|
|
else: |
|
|
file_path = file_obj.name if hasattr(file_obj, 'name') else file_obj |
|
|
file_name = os.path.basename(file_path) |
|
|
|
|
|
zipf.write(file_path, file_name) |
|
|
|
|
|
return { |
|
|
"path": zip_path, |
|
|
"name": zip_name |
|
|
}, f"✅ Created zip with {len(files)} files" |
|
|
|
|
|
elif action == "separate": |
|
|
|
|
|
zip_name = "separate_files.zip" |
|
|
zip_path = os.path.join(temp_dir, zip_name) |
|
|
|
|
|
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: |
|
|
for file_obj in files: |
|
|
|
|
|
if isinstance(file_obj, dict): |
|
|
file_path = file_obj["path"] |
|
|
file_name = file_obj["name"] |
|
|
else: |
|
|
file_path = file_obj.name if hasattr(file_obj, 'name') else file_obj |
|
|
file_name = os.path.basename(file_path) |
|
|
|
|
|
|
|
|
base_name = os.path.splitext(file_name)[0] |
|
|
individual_zip_name = f"{base_name}.zip" |
|
|
individual_zip_path = os.path.join(temp_dir, individual_zip_name) |
|
|
|
|
|
with zipfile.ZipFile(individual_zip_path, 'w', zipfile.ZIP_DEFLATED) as ind_zip: |
|
|
ind_zip.write(file_path, file_name) |
|
|
|
|
|
|
|
|
zipf.write(individual_zip_path, individual_zip_name) |
|
|
|
|
|
return { |
|
|
"path": zip_path, |
|
|
"name": zip_name |
|
|
}, f"✅ Created zip with {len(files)} individual zips" |
|
|
|
|
|
else: |
|
|
return None, "❌ Invalid action selected" |
|
|
|
|
|
except Exception as e: |
|
|
return None, f"❌ Error: {str(e)}" |
|
|
|
|
|
def get_file_info(file_obj): |
|
|
"""Get information about uploaded file""" |
|
|
if not file_obj: |
|
|
return "No file selected" |
|
|
|
|
|
try: |
|
|
|
|
|
if isinstance(file_obj, dict): |
|
|
file_path = file_obj["path"] |
|
|
file_name = file_obj["name"] |
|
|
else: |
|
|
file_path = file_obj.name if hasattr(file_obj, 'name') else file_obj |
|
|
file_name = os.path.basename(file_path) |
|
|
|
|
|
file_size = os.path.getsize(file_path) |
|
|
file_ext = os.path.splitext(file_name)[1].lower() |
|
|
|
|
|
|
|
|
mime_type, _ = mimetypes.guess_type(file_path) |
|
|
file_type = mime_type.split('/')[0] if mime_type else "Unknown" |
|
|
|
|
|
|
|
|
ctime = time.ctime(os.path.getctime(file_path)) |
|
|
mtime = time.ctime(os.path.getmtime(file_path)) |
|
|
|
|
|
info = f""" |
|
|
📄 **File Information:** |
|
|
|
|
|
**Name:** {file_name} |
|
|
**Size:** {file_size:,} bytes ({file_size/1024:.2f} KB) |
|
|
**Extension:** {file_ext} |
|
|
**Type:** {file_type} |
|
|
**Created:** {ctime} |
|
|
**Modified:** {mtime} |
|
|
**Path:** {file_path} |
|
|
""" |
|
|
|
|
|
return info |
|
|
|
|
|
except Exception as e: |
|
|
return f"❌ Error getting file info: {str(e)}" |
|
|
|
|
|
|
|
|
with gr.Blocks(title="File Upload & Download Manager") as iface: |
|
|
|
|
|
gr.Markdown("# 📁 File Upload & Download Manager") |
|
|
gr.Markdown("Upload files and download them as original or zipped") |
|
|
|
|
|
with gr.Tabs(): |
|
|
|
|
|
with gr.Tab("Single File"): |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=2): |
|
|
single_file = gr.File( |
|
|
label="Upload a File", |
|
|
file_count="single" |
|
|
) |
|
|
|
|
|
file_name_input = gr.Textbox( |
|
|
label="Custom Filename (optional)", |
|
|
placeholder="Leave empty to keep original name...", |
|
|
info="Enter a custom name for the downloaded file" |
|
|
) |
|
|
|
|
|
single_action = gr.Radio( |
|
|
choices=[ |
|
|
("Download Original", "original"), |
|
|
("Download as ZIP", "zip"), |
|
|
("Password-protected ZIP", "zip_with_password") |
|
|
], |
|
|
label="Select Action", |
|
|
value="original" |
|
|
) |
|
|
|
|
|
single_btn = gr.Button("Process File", variant="primary") |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
file_info = gr.Markdown(label="File Information") |
|
|
single_status = gr.Textbox(label="Status", interactive=False) |
|
|
single_download = gr.File(label="Download Processed File", interactive=False) |
|
|
|
|
|
|
|
|
single_file.change( |
|
|
fn=get_file_info, |
|
|
inputs=[single_file], |
|
|
outputs=file_info |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Tab("Multiple Files"): |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=2): |
|
|
multi_files = gr.File( |
|
|
label="Upload Multiple Files", |
|
|
file_count="multiple", |
|
|
|
|
|
file_types=[ |
|
|
"image", "video", "audio", |
|
|
"text", ".pdf", ".zip", ".txt", |
|
|
".doc", ".docx", ".xls", ".xlsx" |
|
|
] |
|
|
) |
|
|
|
|
|
multi_action = gr.Radio( |
|
|
choices=[ |
|
|
("Combine all files into one ZIP", "individual"), |
|
|
("Create separate ZIPs for each file", "separate") |
|
|
], |
|
|
label="Select Action", |
|
|
value="individual" |
|
|
) |
|
|
|
|
|
multi_btn = gr.Button("Process Files", variant="primary") |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
multi_status = gr.Textbox(label="Status", interactive=False) |
|
|
multi_download = gr.File(label="Download Processed Files", interactive=False) |
|
|
|
|
|
|
|
|
with gr.Tab("Batch Processing"): |
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
gr.Markdown("### Upload Multiple Files for Batch Processing") |
|
|
|
|
|
batch_files = gr.File( |
|
|
label="Upload Files", |
|
|
file_count="multiple", |
|
|
file_types=None |
|
|
) |
|
|
|
|
|
batch_options = gr.CheckboxGroup( |
|
|
choices=[ |
|
|
"Create individual ZIPs", |
|
|
"Create combined ZIP", |
|
|
"Rename with timestamp", |
|
|
"Add to existing ZIP" |
|
|
], |
|
|
label="Processing Options", |
|
|
value=["Create combined ZIP"] |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
batch_format = gr.Dropdown( |
|
|
choices=[".zip", ".7z", ".tar.gz"], |
|
|
value=".zip", |
|
|
label="Archive Format" |
|
|
) |
|
|
|
|
|
compression_level = gr.Slider( |
|
|
minimum=1, |
|
|
maximum=9, |
|
|
value=6, |
|
|
step=1, |
|
|
label="Compression Level" |
|
|
) |
|
|
|
|
|
batch_btn = gr.Button("Process Batch", variant="primary", size="lg") |
|
|
|
|
|
with gr.Column(): |
|
|
batch_status = gr.Textbox(label="Status", interactive=False, lines=3) |
|
|
batch_download = gr.File(label="Download Results", interactive=False) |
|
|
|
|
|
|
|
|
with gr.Accordion("📖 Instructions & Features", open=False): |
|
|
gr.Markdown(""" |
|
|
## How to Use: |
|
|
|
|
|
### Single File Tab: |
|
|
1. **Upload** a single file |
|
|
2. (Optional) Enter a custom filename |
|
|
3. **Choose action**: Download original, as ZIP, or password-protected ZIP |
|
|
4. Click **Process File** |
|
|
|
|
|
### Multiple Files Tab: |
|
|
1. **Upload** multiple files (Ctrl+Click to select multiple) |
|
|
2. **Choose action**: Combine into one ZIP or create separate ZIPs |
|
|
3. Click **Process Files** |
|
|
|
|
|
### Batch Processing Tab: |
|
|
1. **Upload** multiple files |
|
|
2. **Select processing options** |
|
|
3. Choose archive format and compression level |
|
|
4. Click **Process Batch** |
|
|
|
|
|
## Features: |
|
|
- ✅ Single file upload and download |
|
|
- ✅ Multiple file upload and batch processing |
|
|
- ✅ ZIP file creation with compression |
|
|
- ✅ Password-protected ZIPs (requires pyminizip) |
|
|
- ✅ File information display |
|
|
- ✅ Custom filename support |
|
|
- ✅ Multiple archive formats |
|
|
|
|
|
## Notes: |
|
|
- Files are processed in temporary storage |
|
|
- Original files are not modified |
|
|
- Large files may take time to process |
|
|
- Password for protected ZIPs: `password123` |
|
|
- For Gradio 6+ compatibility, file objects are handled differently |
|
|
""") |
|
|
|
|
|
|
|
|
single_btn.click( |
|
|
fn=process_file, |
|
|
inputs=[single_file, file_name_input, single_action], |
|
|
outputs=[single_download, single_status] |
|
|
) |
|
|
|
|
|
multi_btn.click( |
|
|
fn=process_multiple_files, |
|
|
inputs=[multi_files, multi_action], |
|
|
outputs=[multi_download, multi_status] |
|
|
) |
|
|
|
|
|
|
|
|
def process_batch(files, options, format_type, compression): |
|
|
if not files: |
|
|
return None, "❌ No files uploaded" |
|
|
|
|
|
try: |
|
|
temp_dir = tempfile.mkdtemp() |
|
|
results = [] |
|
|
|
|
|
|
|
|
if "Create combined ZIP" in options: |
|
|
zip_name = f"combined{format_type}" |
|
|
zip_path = os.path.join(temp_dir, zip_name) |
|
|
|
|
|
if format_type == ".zip": |
|
|
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED, compresslevel=compression) as zipf: |
|
|
for file_obj in files: |
|
|
|
|
|
if isinstance(file_obj, dict): |
|
|
file_path = file_obj["path"] |
|
|
arcname = file_obj["name"] |
|
|
else: |
|
|
file_path = file_obj.name if hasattr(file_obj, 'name') else file_obj |
|
|
arcname = os.path.basename(file_path) |
|
|
|
|
|
if "Rename with timestamp" in options: |
|
|
name, ext = os.path.splitext(arcname) |
|
|
arcname = f"{name}_{int(time.time())}{ext}" |
|
|
zipf.write(file_path, arcname) |
|
|
|
|
|
results.append({ |
|
|
"path": zip_path, |
|
|
"name": zip_name |
|
|
}) |
|
|
|
|
|
if "Create individual ZIPs" in options: |
|
|
for file_obj in files: |
|
|
|
|
|
if isinstance(file_obj, dict): |
|
|
file_path = file_obj["path"] |
|
|
file_name = file_obj["name"] |
|
|
else: |
|
|
file_path = file_obj.name if hasattr(file_obj, 'name') else file_obj |
|
|
file_name = os.path.basename(file_path) |
|
|
|
|
|
base_name = os.path.splitext(file_name)[0] |
|
|
if "Rename with timestamp" in options: |
|
|
base_name = f"{base_name}_{int(time.time())}" |
|
|
|
|
|
individual_zip_name = f"{base_name}{format_type}" |
|
|
individual_zip_path = os.path.join(temp_dir, individual_zip_name) |
|
|
|
|
|
if format_type == ".zip": |
|
|
with zipfile.ZipFile(individual_zip_path, 'w', zipfile.ZIP_DEFLATED, compresslevel=compression) as zipf: |
|
|
zipf.write(file_path, file_name) |
|
|
|
|
|
results.append({ |
|
|
"path": individual_zip_path, |
|
|
"name": individual_zip_name |
|
|
}) |
|
|
|
|
|
|
|
|
if len(results) > 1: |
|
|
final_zip_name = f"batch_results{format_type}" |
|
|
final_zip_path = os.path.join(temp_dir, final_zip_name) |
|
|
|
|
|
with zipfile.ZipFile(final_zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: |
|
|
for result in results: |
|
|
zipf.write(result["path"], result["name"]) |
|
|
|
|
|
output_file = { |
|
|
"path": final_zip_path, |
|
|
"name": final_zip_name |
|
|
} |
|
|
elif results: |
|
|
output_file = results[0] |
|
|
else: |
|
|
return None, "⚠️ No processing options selected" |
|
|
|
|
|
return output_file, f"✅ Processed {len(files)} file(s) with {len(options)} option(s)" |
|
|
|
|
|
except Exception as e: |
|
|
return None, f"❌ Error: {str(e)}" |
|
|
|
|
|
batch_btn.click( |
|
|
fn=process_batch, |
|
|
inputs=[batch_files, batch_options, batch_format, compression_level], |
|
|
outputs=[batch_download, batch_status] |
|
|
) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
available_port = find_available_port(start_port=7860, max_attempts=20) |
|
|
|
|
|
print(f"Starting server on port {available_port}...") |
|
|
|
|
|
iface.launch( |
|
|
server_name="0.0.0.0", |
|
|
server_port=available_port, |
|
|
show_error=True, |
|
|
share=False, |
|
|
theme=gr.themes.Soft() |
|
|
) |