Spaces:
Runtime error
Runtime error
| # Copyright (c) 2025 Stephen G. Pope | |
| # | |
| # This program is free software; you can redistribute it and/or modify | |
| # it under the terms of the GNU General Public License as published by | |
| # the Free Software Foundation; either version 2 of the License, or | |
| # (at your option) any later version. | |
| # | |
| # This program is distributed in the hope that it will be useful, | |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
| # GNU General Public License for more details. | |
| # | |
| # You should have received a copy of the GNU General Public License along | |
| # with this program; if not, write to the Free Software Foundation, Inc., | |
| # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | |
| import os | |
| import ffmpeg | |
| import requests | |
| from services.file_management import download_file | |
| # Set the default local storage directory | |
| STORAGE_PATH = "/tmp/" | |
| def process_conversion(media_url, job_id, bitrate='128k', webhook_url=None): | |
| """Convert media to MP3 format with specified bitrate.""" | |
| input_filename = download_file(media_url, os.path.join(STORAGE_PATH, f"{job_id}_input")) | |
| output_filename = f"{job_id}.mp3" | |
| output_path = os.path.join(STORAGE_PATH, output_filename) | |
| try: | |
| # Convert media file to MP3 with specified bitrate | |
| ( | |
| ffmpeg | |
| .input(input_filename) | |
| .output(output_path, acodec='libmp3lame', audio_bitrate=bitrate) | |
| .overwrite_output() | |
| .run(capture_stdout=True, capture_stderr=True) | |
| ) | |
| os.remove(input_filename) | |
| print(f"Conversion successful: {output_path} with bitrate {bitrate}") | |
| # Ensure the output file exists locally before attempting upload | |
| if not os.path.exists(output_path): | |
| raise FileNotFoundError(f"Output file {output_path} does not exist after conversion.") | |
| return output_path | |
| except Exception as e: | |
| print(f"Conversion failed: {str(e)}") | |
| raise | |
| def process_video_combination(media_urls, job_id, webhook_url=None): | |
| """Combine multiple videos into one.""" | |
| input_files = [] | |
| output_filename = f"{job_id}.mp4" | |
| output_path = os.path.join(STORAGE_PATH, output_filename) | |
| try: | |
| # Download all media files | |
| for i, media_item in enumerate(media_urls): | |
| url = media_item['video_url'] | |
| input_filename = download_file(url, os.path.join(STORAGE_PATH, f"{job_id}_input_{i}")) | |
| input_files.append(input_filename) | |
| # Generate an absolute path concat list file for FFmpeg | |
| concat_file_path = os.path.join(STORAGE_PATH, f"{job_id}_concat_list.txt") | |
| with open(concat_file_path, 'w') as concat_file: | |
| for input_file in input_files: | |
| # Write absolute paths to the concat list | |
| concat_file.write(f"file '{os.path.abspath(input_file)}'\n") | |
| # Use the concat demuxer to concatenate the videos | |
| ( | |
| ffmpeg.input(concat_file_path, format='concat', safe=0). | |
| output(output_path, c='copy'). | |
| run(overwrite_output=True) | |
| ) | |
| # Clean up input files | |
| for f in input_files: | |
| os.remove(f) | |
| os.remove(concat_file_path) # Remove the concat list file after the operation | |
| print(f"Video combination successful: {output_path}") | |
| # Check if the output file exists locally before upload | |
| if not os.path.exists(output_path): | |
| raise FileNotFoundError(f"Output file {output_path} does not exist after combination.") | |
| return output_path | |
| except Exception as e: | |
| print(f"Video combination failed: {str(e)}") | |
| raise | |