# Copyright (c) 2025 Stephen G. Pope # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. import os import ffmpeg import logging import requests import subprocess from services.file_management import download_file # Set the default local storage directory STORAGE_PATH = "/tmp/" # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Define the path to the fonts directory FONTS_DIR = '/usr/share/fonts/custom' # Create the FONT_PATHS dictionary by reading the fonts directory FONT_PATHS = {} for font_file in os.listdir(FONTS_DIR): if font_file.endswith('.ttf') or font_file.endswith('.TTF'): font_name = os.path.splitext(font_file)[0] FONT_PATHS[font_name] = os.path.join(FONTS_DIR, font_file) # logger.info(f"Available fonts: {FONT_PATHS}") # Create a list of acceptable font names ACCEPTABLE_FONTS = list(FONT_PATHS.keys()) #logger.info(f"Acceptable font names: {ACCEPTABLE_FONTS}") # Match font files with fontconfig names def match_fonts(): try: result = subprocess.run(['fc-list', ':family'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if result.returncode == 0: fontconfig_fonts = result.stdout.split('\n') fontconfig_fonts = list(set(fontconfig_fonts)) # Remove duplicates matched_fonts = {} for font_file in FONT_PATHS.keys(): for fontconfig_font in fontconfig_fonts: if font_file.lower() in fontconfig_font.lower(): matched_fonts[font_file] = fontconfig_font.strip() # Parse and output the matched font names unique_font_names = set() for font in matched_fonts.values(): font_name = font.split(':')[1].strip() unique_font_names.add(font_name) # Remove duplicates from font_name and sort them alphabetically unique_font_names = sorted(list(set(unique_font_names))) # for font_name in unique_font_names: # print(font_name) else: logger.error(f"Error matching fonts: {result.stderr}") except Exception as e: logger.error(f"Exception while matching fonts: {str(e)}") match_fonts() def generate_style_line(options): """Generate ASS style line from options.""" style_options = { 'Name': 'Default', 'Fontname': options.get('font_name', 'Arial'), 'Fontsize': options.get('font_size', 12), 'PrimaryColour': options.get('primary_color', '&H00FFFFFF'), 'OutlineColour': options.get('outline_color', '&H00000000'), 'BackColour': options.get('back_color', '&H00000000'), 'Bold': options.get('bold', 0), 'Italic': options.get('italic', 0), 'Underline': options.get('underline', 0), 'StrikeOut': options.get('strikeout', 0), 'ScaleX': 100, 'ScaleY': 100, 'Spacing': 0, 'Angle': 0, 'BorderStyle': 1, 'Outline': options.get('outline', 1), 'Shadow': options.get('shadow', 0), 'Alignment': options.get('alignment', 2), 'MarginL': options.get('margin_l', 10), 'MarginR': options.get('margin_r', 10), 'MarginV': options.get('margin_v', 10), 'Encoding': options.get('encoding', 1) } return f"Style: {','.join(str(v) for v in style_options.values())}" def process_captioning(file_url, caption_srt, caption_type, options, job_id): """Process video captioning using FFmpeg.""" try: logger.info(f"Job {job_id}: Starting download of file from {file_url}") video_path = download_file(file_url, STORAGE_PATH) logger.info(f"Job {job_id}: File downloaded to {video_path}") subtitle_extension = '.' + caption_type srt_path = os.path.join(STORAGE_PATH, f"{job_id}{subtitle_extension}") options = convert_array_to_collection(options) caption_style = "" if caption_type == 'ass': style_string = generate_style_line(options) caption_style = f""" [Script Info] Title: Highlight Current Word ScriptType: v4.00+ [V4+ Styles] Format: Name, Fontname, Fontsize, PrimaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding {style_string} [Events] Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text """ logger.info(f"Job {job_id}: Generated ASS style string: {style_string}") if caption_srt.startswith("https"): # Download the file if caption_srt is a URL logger.info(f"Job {job_id}: Downloading caption file from {caption_srt}") response = requests.get(caption_srt) response.raise_for_status() # Raise an exception for bad status codes if caption_type in ['srt','vtt']: with open(srt_path, 'wb') as srt_file: srt_file.write(response.content) else: subtitle_content = caption_style + response.text with open(srt_path, 'w') as srt_file: srt_file.write(subtitle_content) logger.info(f"Job {job_id}: Caption file downloaded to {srt_path}") else: # Write caption_srt content directly to file subtitle_content = caption_style + caption_srt with open(srt_path, 'w') as srt_file: srt_file.write(subtitle_content) logger.info(f"Job {job_id}: SRT file created at {srt_path}") output_path = os.path.join(STORAGE_PATH, f"{job_id}_captioned.mp4") logger.info(f"Job {job_id}: Output path set to {output_path}") # Ensure font_name is converted to the full font path font_name = options.get('font_name', 'Arial') if font_name in FONT_PATHS: selected_font = FONT_PATHS[font_name] logger.info(f"Job {job_id}: Font path set to {selected_font}") else: selected_font = FONT_PATHS.get('Arial') logger.warning(f"Job {job_id}: Font {font_name} not found. Using default font Arial.") # For ASS subtitles, we should avoid overriding styles if subtitle_extension == '.ass': # Use the subtitles filter without force_style subtitle_filter = f"subtitles='{srt_path}'" logger.info(f"Job {job_id}: Using ASS subtitle filter: {subtitle_filter}") else: # Construct FFmpeg filter options for subtitles with detailed styling subtitle_filter = f"subtitles={srt_path}:force_style='" style_options = { 'FontName': font_name, # Use the font name instead of the font file path 'FontSize': options.get('font_size', 24), 'PrimaryColour': options.get('primary_color', '&H00FFFFFF'), 'SecondaryColour': options.get('secondary_color', '&H00000000'), 'OutlineColour': options.get('outline_color', '&H00000000'), 'BackColour': options.get('back_color', '&H00000000'), 'Bold': options.get('bold', 0), 'Italic': options.get('italic', 0), 'Underline': options.get('underline', 0), 'StrikeOut': options.get('strikeout', 0), 'Alignment': options.get('alignment', 2), 'MarginV': options.get('margin_v', 10), 'MarginL': options.get('margin_l', 10), 'MarginR': options.get('margin_r', 10), 'Outline': options.get('outline', 1), 'Shadow': options.get('shadow', 0), 'Blur': options.get('blur', 0), 'BorderStyle': options.get('border_style', 1), 'Encoding': options.get('encoding', 1), 'Spacing': options.get('spacing', 0), 'Angle': options.get('angle', 0), 'UpperCase': options.get('uppercase', 0) } # Add only populated options to the subtitle filter subtitle_filter += ','.join(f"{k}={v}" for k, v in style_options.items() if v is not None) subtitle_filter += "'" logger.info(f"Job {job_id}: Using subtitle filter: {subtitle_filter}") try: # Log the FFmpeg command for debugging logger.info(f"Job {job_id}: Running FFmpeg with filter: {subtitle_filter}") # Run FFmpeg to add subtitles to the video ffmpeg.input(video_path).output( output_path, vf=subtitle_filter, acodec='copy' ).run() logger.info(f"Job {job_id}: FFmpeg processing completed, output file at {output_path}") except ffmpeg.Error as e: # Log the FFmpeg stderr output if e.stderr: error_message = e.stderr.decode('utf8') else: error_message = 'Unknown FFmpeg error' logger.error(f"Job {job_id}: FFmpeg error: {error_message}") raise # The upload process will be handled by the calling function return output_path # Clean up local files os.remove(video_path) os.remove(srt_path) os.remove(output_path) logger.info(f"Job {job_id}: Local files cleaned up") except Exception as e: logger.error(f"Job {job_id}: Error in process_captioning: {str(e)}") raise def convert_array_to_collection(options): logger.info(f"Converting options array to dictionary: {options}") return {item["option"]: item["value"] for item in options}