| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
|
|
| import os |
| import ffmpeg |
| import logging |
| import requests |
| import subprocess |
| from services.file_management import download_file |
|
|
| |
| STORAGE_PATH = "/tmp/" |
|
|
| |
| logging.basicConfig(level=logging.INFO) |
| logger = logging.getLogger(__name__) |
|
|
| |
| FONTS_DIR = '/usr/share/fonts/custom' |
|
|
| |
| FONT_PATHS = {} |
| for font_file in os.listdir(FONTS_DIR): |
| if font_file.endswith('.ttf') or font_file.endswith('.TTF'): |
| font_name = os.path.splitext(font_file)[0] |
| FONT_PATHS[font_name] = os.path.join(FONTS_DIR, font_file) |
| |
|
|
| |
| ACCEPTABLE_FONTS = list(FONT_PATHS.keys()) |
| |
|
|
| |
| def match_fonts(): |
| try: |
| result = subprocess.run(['fc-list', ':family'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) |
| if result.returncode == 0: |
| fontconfig_fonts = result.stdout.split('\n') |
| fontconfig_fonts = list(set(fontconfig_fonts)) |
| matched_fonts = {} |
| for font_file in FONT_PATHS.keys(): |
| for fontconfig_font in fontconfig_fonts: |
| if font_file.lower() in fontconfig_font.lower(): |
| matched_fonts[font_file] = fontconfig_font.strip() |
|
|
| |
| unique_font_names = set() |
| for font in matched_fonts.values(): |
| font_name = font.split(':')[1].strip() |
| unique_font_names.add(font_name) |
| |
| |
| unique_font_names = sorted(list(set(unique_font_names))) |
| |
| |
| |
| else: |
| logger.error(f"Error matching fonts: {result.stderr}") |
| except Exception as e: |
| logger.error(f"Exception while matching fonts: {str(e)}") |
|
|
| match_fonts() |
|
|
| def generate_style_line(options): |
| """Generate ASS style line from options.""" |
| style_options = { |
| 'Name': 'Default', |
| 'Fontname': options.get('font_name', 'Arial'), |
| 'Fontsize': options.get('font_size', 12), |
| 'PrimaryColour': options.get('primary_color', '&H00FFFFFF'), |
| 'OutlineColour': options.get('outline_color', '&H00000000'), |
| 'BackColour': options.get('back_color', '&H00000000'), |
| 'Bold': options.get('bold', 0), |
| 'Italic': options.get('italic', 0), |
| 'Underline': options.get('underline', 0), |
| 'StrikeOut': options.get('strikeout', 0), |
| 'ScaleX': 100, |
| 'ScaleY': 100, |
| 'Spacing': 0, |
| 'Angle': 0, |
| 'BorderStyle': 1, |
| 'Outline': options.get('outline', 1), |
| 'Shadow': options.get('shadow', 0), |
| 'Alignment': options.get('alignment', 2), |
| 'MarginL': options.get('margin_l', 10), |
| 'MarginR': options.get('margin_r', 10), |
| 'MarginV': options.get('margin_v', 10), |
| 'Encoding': options.get('encoding', 1) |
| } |
| return f"Style: {','.join(str(v) for v in style_options.values())}" |
|
|
| def process_captioning(file_url, caption_srt, caption_type, options, job_id): |
| """Process video captioning using FFmpeg.""" |
| try: |
| logger.info(f"Job {job_id}: Starting download of file from {file_url}") |
| video_path = download_file(file_url, STORAGE_PATH) |
| logger.info(f"Job {job_id}: File downloaded to {video_path}") |
|
|
| subtitle_extension = '.' + caption_type |
| srt_path = os.path.join(STORAGE_PATH, f"{job_id}{subtitle_extension}") |
| options = convert_array_to_collection(options) |
| caption_style = "" |
|
|
| if caption_type == 'ass': |
| style_string = generate_style_line(options) |
| caption_style = f""" |
| [Script Info] |
| Title: Highlight Current Word |
| ScriptType: v4.00+ |
| [V4+ Styles] |
| Format: Name, Fontname, Fontsize, PrimaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding |
| {style_string} |
| [Events] |
| Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text |
| """ |
| logger.info(f"Job {job_id}: Generated ASS style string: {style_string}") |
|
|
| if caption_srt.startswith("https"): |
| |
| logger.info(f"Job {job_id}: Downloading caption file from {caption_srt}") |
| response = requests.get(caption_srt) |
| response.raise_for_status() |
| if caption_type in ['srt','vtt']: |
| with open(srt_path, 'wb') as srt_file: |
| srt_file.write(response.content) |
| else: |
| subtitle_content = caption_style + response.text |
| with open(srt_path, 'w') as srt_file: |
| srt_file.write(subtitle_content) |
| logger.info(f"Job {job_id}: Caption file downloaded to {srt_path}") |
| else: |
| |
| subtitle_content = caption_style + caption_srt |
| with open(srt_path, 'w') as srt_file: |
| srt_file.write(subtitle_content) |
| logger.info(f"Job {job_id}: SRT file created at {srt_path}") |
|
|
| output_path = os.path.join(STORAGE_PATH, f"{job_id}_captioned.mp4") |
| logger.info(f"Job {job_id}: Output path set to {output_path}") |
|
|
| |
| font_name = options.get('font_name', 'Arial') |
| if font_name in FONT_PATHS: |
| selected_font = FONT_PATHS[font_name] |
| logger.info(f"Job {job_id}: Font path set to {selected_font}") |
| else: |
| selected_font = FONT_PATHS.get('Arial') |
| logger.warning(f"Job {job_id}: Font {font_name} not found. Using default font Arial.") |
|
|
| |
| if subtitle_extension == '.ass': |
| |
| subtitle_filter = f"subtitles='{srt_path}'" |
| logger.info(f"Job {job_id}: Using ASS subtitle filter: {subtitle_filter}") |
| else: |
| |
| subtitle_filter = f"subtitles={srt_path}:force_style='" |
| style_options = { |
| 'FontName': font_name, |
| 'FontSize': options.get('font_size', 24), |
| 'PrimaryColour': options.get('primary_color', '&H00FFFFFF'), |
| 'SecondaryColour': options.get('secondary_color', '&H00000000'), |
| 'OutlineColour': options.get('outline_color', '&H00000000'), |
| 'BackColour': options.get('back_color', '&H00000000'), |
| 'Bold': options.get('bold', 0), |
| 'Italic': options.get('italic', 0), |
| 'Underline': options.get('underline', 0), |
| 'StrikeOut': options.get('strikeout', 0), |
| 'Alignment': options.get('alignment', 2), |
| 'MarginV': options.get('margin_v', 10), |
| 'MarginL': options.get('margin_l', 10), |
| 'MarginR': options.get('margin_r', 10), |
| 'Outline': options.get('outline', 1), |
| 'Shadow': options.get('shadow', 0), |
| 'Blur': options.get('blur', 0), |
| 'BorderStyle': options.get('border_style', 1), |
| 'Encoding': options.get('encoding', 1), |
| 'Spacing': options.get('spacing', 0), |
| 'Angle': options.get('angle', 0), |
| 'UpperCase': options.get('uppercase', 0) |
| } |
|
|
| |
| subtitle_filter += ','.join(f"{k}={v}" for k, v in style_options.items() if v is not None) |
| subtitle_filter += "'" |
| logger.info(f"Job {job_id}: Using subtitle filter: {subtitle_filter}") |
|
|
| try: |
| |
| logger.info(f"Job {job_id}: Running FFmpeg with filter: {subtitle_filter}") |
|
|
| |
| ffmpeg.input(video_path).output( |
| output_path, |
| vf=subtitle_filter, |
| acodec='copy' |
| ).run() |
| logger.info(f"Job {job_id}: FFmpeg processing completed, output file at {output_path}") |
| except ffmpeg.Error as e: |
| |
| if e.stderr: |
| error_message = e.stderr.decode('utf8') |
| else: |
| error_message = 'Unknown FFmpeg error' |
| logger.error(f"Job {job_id}: FFmpeg error: {error_message}") |
| raise |
|
|
| |
| return output_path |
|
|
| |
| os.remove(video_path) |
| os.remove(srt_path) |
| os.remove(output_path) |
| logger.info(f"Job {job_id}: Local files cleaned up") |
| except Exception as e: |
| logger.error(f"Job {job_id}: Error in process_captioning: {str(e)}") |
| raise |
|
|
| def convert_array_to_collection(options): |
| logger.info(f"Converting options array to dictionary: {options}") |
| return {item["option"]: item["value"] for item in options} |
|
|