Spaces:
Running
Running
| import os | |
| import zipfile | |
| import logging | |
| from flask import Flask, render_template, request, send_file, jsonify, url_for | |
| from werkzeug.utils import secure_filename | |
| from main import main | |
| app = Flask(__name__) | |
| # Directory for temporary file uploads | |
| UPLOAD_FOLDER = '/tmp/uploads' | |
| if not os.path.exists(UPLOAD_FOLDER): | |
| os.makedirs(UPLOAD_FOLDER) | |
| # Configure logging | |
| logging.basicConfig(level=logging.DEBUG) | |
| def home(): | |
| logging.info("Rendering home page") | |
| return render_template('index.html') | |
| def process(): | |
| logging.info("Process endpoint called") | |
| # Get the operation type | |
| operation = request.form.get('operation') | |
| source_lang = request.form.get('source-lang') | |
| target_lang = request.form.get('target-lang') | |
| dubbing_type = request.form.get('dubbing-type') | |
| dubbing_lang = request.form.get('dubbing-lang') | |
| logging.debug(f"Operation: {operation}, Source Lang: {source_lang}, Target Lang: {target_lang}, Dubbing Type: {dubbing_type}, Dubbing Lang: {dubbing_lang}") | |
| # Initialize a dictionary to store in-memory files | |
| in_memory_files = {} | |
| # Handle file uploads based on the operation | |
| if operation in ['colorize', 'translate', 'both']: | |
| logging.info("Handling image files for operation") | |
| uploaded_files = request.files.getlist('images') | |
| for file in uploaded_files: | |
| if file.filename != '': | |
| filename = secure_filename(file.filename) | |
| in_memory_files[filename] = file.read() | |
| logging.debug(f"Uploaded file: {filename}") | |
| elif operation == 'subtitle': | |
| logging.info("Handling video file for subtitle operation") | |
| video_file = request.files.get('images') | |
| if video_file and video_file.filename != '': | |
| filename = secure_filename(video_file.filename) | |
| in_memory_files[filename] = video_file.read() | |
| logging.debug(f"Uploaded video file: {filename}") | |
| elif operation == 'dubbing': | |
| logging.info("Handling video and SRT files for dubbing operation") | |
| video_file = request.files.get('video-file') | |
| srt_file = request.files.get('srt-file') | |
| if video_file and video_file.filename != '': | |
| video_filename = secure_filename(video_file.filename) | |
| in_memory_files[video_filename] = video_file.read() | |
| logging.debug(f"Uploaded video file: {video_filename}") | |
| if srt_file and srt_file.filename != '': | |
| srt_filename = secure_filename(srt_file.filename) | |
| in_memory_files[srt_filename] = srt_file.read() | |
| logging.debug(f"Uploaded SRT file: {srt_filename}") | |
| # Process the files | |
| try: | |
| logging.info("Processing files") | |
| results = main(in_memory_files, operation, source_lang, target_lang, dubbing_type, dubbing_lang) | |
| logging.info("Files processed successfully") | |
| except Exception as e: | |
| logging.error(f"Error during processing: {str(e)}") | |
| return jsonify({'status': 'error', 'message': str(e)}), 500 | |
| # Save results to a ZIP file | |
| zip_filename = os.path.join(UPLOAD_FOLDER, 'results.zip') | |
| try: | |
| logging.info("Saving results to ZIP file") | |
| with zipfile.ZipFile(zip_filename, 'w', compression=zipfile.ZIP_DEFLATED) as zipf: | |
| if operation == 'dubbing': | |
| for filename, file_path in results.items(): | |
| with open(file_path, 'rb') as f: | |
| zipf.writestr(filename, f.read()) | |
| logging.debug(f"Added {filename} to ZIP") | |
| else: | |
| for filename, file_data in results.items(): | |
| zipf.writestr(filename, file_data) | |
| logging.debug(f"Added {filename} to ZIP") | |
| logging.info("Results saved to ZIP file successfully") | |
| except Exception as e: | |
| logging.error(f"Error during zipping the files: {str(e)}") | |
| return jsonify({'status': 'error', 'message': 'Error during zipping the files: ' + str(e)}), 500 | |
| # Generate a download URL | |
| download_url = url_for('download', filename='results.zip', _external=True, _scheme='https') | |
| logging.info(f"Generated download URL: {download_url}") | |
| # Return the download link as JSON | |
| return jsonify({'status': 'ready', 'download_url': download_url}) | |
| def download(filename): | |
| logging.info(f"Download endpoint called for file: {filename}") | |
| file_path = os.path.join(UPLOAD_FOLDER, filename) | |
| if not os.path.exists(file_path): | |
| logging.error("File not found for download") | |
| return jsonify({'status': 'error', 'message': 'File not found'}), 404 | |
| logging.info("File found, preparing to send") | |
| if filename.endswith('.mp4'): | |
| return send_file(file_path, as_attachment=True, mimetype='video/mp4') | |
| return send_file(file_path, as_attachment=True) | |
| def delete_uploads(): | |
| logging.info("Delete uploads endpoint called") | |
| try: | |
| for filename in os.listdir(UPLOAD_FOLDER): | |
| file_path = os.path.join(UPLOAD_FOLDER, filename) | |
| if os.path.isfile(file_path): | |
| os.remove(file_path) | |
| logging.debug(f"Deleted file: {filename}") | |
| logging.info("All files deleted successfully") | |
| return jsonify({'status': 'success', 'message': 'Files deleted successfully'}) | |
| except Exception as e: | |
| logging.error(f"Error deleting files: {str(e)}") | |
| return jsonify({'status': 'error', 'message': 'Error deleting files: ' + str(e)}), 500 | |
| if __name__ == '__main__': | |
| app.run(host='0.0.0.0', port=7860) |