Spaces:
Runtime error
Runtime error
| # Copyright (c) 2025 Stephen G. Pope | |
| # | |
| # This program is free software; you can redistribute it and/or modify | |
| # it under the terms of the GNU General Public License as published by | |
| # the Free Software Foundation; either version 2 of the License, or | |
| # (at your option) any later version. | |
| # | |
| # This program is distributed in the hope that it will be useful, | |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
| # GNU General Public License for more details. | |
| # | |
| # You should have received a copy of the GNU General Public License along | |
| # with this program; if not, write to the Free Software Foundation, Inc., | |
| # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | |
| import os | |
| import subprocess | |
| import json | |
| import re | |
| from services.file_management import download_file | |
| from config import LOCAL_STORAGE_PATH | |
| def get_extension_from_format(format_name): | |
| # Mapping of common format names to file extensions | |
| format_to_extension = { | |
| 'mp4': 'mp4', | |
| 'mov': 'mov', | |
| 'avi': 'avi', | |
| 'mkv': 'mkv', | |
| 'webm': 'webm', | |
| 'gif': 'gif', | |
| 'apng': 'apng', | |
| 'jpg': 'jpg', | |
| 'jpeg': 'jpg', | |
| 'png': 'png', | |
| 'image2': 'png', # Assume png for image2 format | |
| 'rawvideo': 'raw', | |
| 'mp3': 'mp3', | |
| 'wav': 'wav', | |
| 'aac': 'aac', | |
| 'flac': 'flac', | |
| 'ogg': 'ogg' | |
| } | |
| return format_to_extension.get(format_name.lower(), 'mp4') # Default to mp4 if unknown | |
| def get_metadata(filename, metadata_requests, job_id): | |
| metadata = {} | |
| if metadata_requests.get('thumbnail'): | |
| thumbnail_filename = f"{os.path.splitext(filename)[0]}_thumbnail.jpg" | |
| thumbnail_command = [ | |
| 'ffmpeg', | |
| '-i', filename, | |
| '-vf', 'select=eq(n\,0)', | |
| '-vframes', '1', | |
| thumbnail_filename | |
| ] | |
| try: | |
| subprocess.run(thumbnail_command, check=True, capture_output=True, text=True) | |
| if os.path.exists(thumbnail_filename): | |
| metadata['thumbnail'] = thumbnail_filename # Return local path instead of URL | |
| except subprocess.CalledProcessError as e: | |
| print(f"Thumbnail generation failed: {e.stderr}") | |
| if metadata_requests.get('filesize'): | |
| metadata['filesize'] = os.path.getsize(filename) | |
| if metadata_requests.get('encoder') or metadata_requests.get('duration') or metadata_requests.get('bitrate'): | |
| ffprobe_command = [ | |
| 'ffprobe', | |
| '-v', 'quiet', | |
| '-print_format', 'json', | |
| '-show_format', | |
| '-show_streams', | |
| filename | |
| ] | |
| result = subprocess.run(ffprobe_command, capture_output=True, text=True) | |
| probe_data = json.loads(result.stdout) | |
| if metadata_requests.get('duration'): | |
| metadata['duration'] = float(probe_data['format']['duration']) | |
| if metadata_requests.get('bitrate'): | |
| metadata['bitrate'] = int(probe_data['format']['bit_rate']) | |
| if metadata_requests.get('encoder'): | |
| metadata['encoder'] = {} | |
| for stream in probe_data['streams']: | |
| if stream['codec_type'] == 'video': | |
| metadata['encoder']['video'] = stream.get('codec_name', 'unknown') | |
| elif stream['codec_type'] == 'audio': | |
| metadata['encoder']['audio'] = stream.get('codec_name', 'unknown') | |
| return metadata | |
| def process_ffmpeg_compose(data, job_id): | |
| output_filenames = [] | |
| # Build FFmpeg command | |
| command = ["ffmpeg"] | |
| # Add global options | |
| for option in data.get("global_options", []): | |
| command.append(option["option"]) | |
| if "argument" in option and option["argument"] is not None: | |
| command.append(str(option["argument"])) | |
| # Add inputs | |
| input_paths = [] | |
| for input_data in data["inputs"]: | |
| if "options" in input_data: | |
| for option in input_data["options"]: | |
| command.append(option["option"]) | |
| if "argument" in option and option["argument"] is not None: | |
| command.append(str(option["argument"])) | |
| input_path = download_file(input_data["file_url"], LOCAL_STORAGE_PATH) | |
| input_paths.append(input_path) | |
| command.extend(["-i", input_path]) | |
| # Add filters | |
| subtitles_paths = [] # Track downloaded subtitles/filter files | |
| if data.get("filters"): | |
| new_filters = [] | |
| for filter_obj in data["filters"]: | |
| filter_str = filter_obj["filter"] | |
| def replace_subtitles_url(match): | |
| url = match.group(1) | |
| local_path = download_file(url, LOCAL_STORAGE_PATH) | |
| subtitles_paths.append(local_path) | |
| fixed_path = local_path.replace('\\', '/') | |
| return f"subtitles='{fixed_path}" # keep the opening quote | |
| # Regex: subtitles='<url>' or subtitles="<url>" | |
| filter_str = re.sub(r"subtitles=['\"]([^'\"]+)", replace_subtitles_url, filter_str) | |
| new_filters.append(filter_str) | |
| filter_complex = ";".join(new_filters) | |
| command.extend(["-filter_complex", filter_complex]) | |
| # Add outputs | |
| for i, output in enumerate(data["outputs"]): | |
| format_name = None | |
| for option in output["options"]: | |
| if option["option"] == "-f": | |
| format_name = option.get("argument") | |
| break | |
| extension = get_extension_from_format(format_name) if format_name else 'mp4' | |
| output_filename = os.path.join(LOCAL_STORAGE_PATH, f"{job_id}_output_{i}.{extension}") | |
| output_filenames.append(output_filename) | |
| for option in output["options"]: | |
| command.append(option["option"]) | |
| if "argument" in option and option["argument"] is not None: | |
| command.append(str(option["argument"])) | |
| command.append(output_filename) | |
| # Execute FFmpeg command | |
| try: | |
| subprocess.run(command, check=True, capture_output=True, text=True) | |
| except subprocess.CalledProcessError as e: | |
| raise Exception(f"FFmpeg command failed: {e.stderr}") | |
| # Clean up input files | |
| for input_path in input_paths: | |
| if os.path.exists(input_path): | |
| os.remove(input_path) | |
| # Clean up subtitles/filter files | |
| for subtitles_path in subtitles_paths: | |
| if os.path.exists(subtitles_path): | |
| os.remove(subtitles_path) | |
| # Get metadata if requested | |
| metadata = [] | |
| if data.get("metadata"): | |
| for output_filename in output_filenames: | |
| metadata.append(get_metadata(output_filename, data["metadata"], job_id)) | |
| return output_filenames, metadata |