# Copyright (c) 2025 Stephen G. Pope # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. import os import ffmpeg import logging import subprocess import whisper from datetime import timedelta import srt import re from services.file_management import download_file from services.cloud_storage import upload_file # Ensure this import is present import requests # Ensure requests is imported for webhook handling from urllib.parse import urlparse from config import LOCAL_STORAGE_PATH # Initialize logger logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) if not logger.hasHandlers(): handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) POSITION_ALIGNMENT_MAP = { "bottom_left": 1, "bottom_center": 2, "bottom_right": 3, "middle_left": 4, "middle_center": 5, "middle_right": 6, "top_left": 7, "top_center": 8, "top_right": 9 } def rgb_to_ass_color(rgb_color): """Convert RGB hex to ASS (&HAABBGGRR).""" if isinstance(rgb_color, str): rgb_color = rgb_color.lstrip('#') if len(rgb_color) == 6: r = int(rgb_color[0:2], 16) g = int(rgb_color[2:4], 16) b = int(rgb_color[4:6], 16) return f"&H00{b:02X}{g:02X}{r:02X}" return "&H00FFFFFF" def generate_transcription(video_path, language='auto'): try: model = whisper.load_model("base") transcription_options = { 'word_timestamps': True, 'verbose': True, } if language != 'auto': transcription_options['language'] = language result = model.transcribe(video_path, **transcription_options) logger.info(f"Transcription generated successfully for video: {video_path}") return result except Exception as e: logger.error(f"Error in transcription: {str(e)}") raise def get_video_resolution(video_path): try: probe = ffmpeg.probe(video_path) video_streams = [s for s in probe['streams'] if s['codec_type'] == 'video'] if video_streams: width = int(video_streams[0]['width']) height = int(video_streams[0]['height']) logger.info(f"Video resolution determined: {width}x{height}") return width, height else: logger.warning(f"No video streams found for {video_path}. Using default resolution 384x288.") return 384, 288 except Exception as e: logger.error(f"Error getting video resolution: {str(e)}. Using default resolution 384x288.") return 384, 288 def get_available_fonts(): """Get the list of available fonts on the system.""" try: import matplotlib.font_manager as fm except ImportError: logger.error("matplotlib not installed. Install via 'pip install matplotlib'.") return [] font_list = fm.findSystemFonts(fontpaths=None, fontext='ttf') font_names = set() for font in font_list: try: font_prop = fm.FontProperties(fname=font) font_name = font_prop.get_name() font_names.add(font_name) except Exception: continue logger.info(f"Available fonts retrieved: {font_names}") return list(font_names) def format_ass_time(seconds): """Convert float seconds to ASS time format H:MM:SS.cc""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60) centiseconds = int(round((seconds - int(seconds)) * 100)) return f"{hours}:{minutes:02}:{secs:02}.{centiseconds:02}" def process_subtitle_text(text, replace_dict, all_caps, max_words_per_line): """Apply text transformations: replacements, all caps, and optional line splitting.""" for old_word, new_word in replace_dict.items(): text = re.sub(re.escape(old_word), new_word, text, flags=re.IGNORECASE) if all_caps: text = text.upper() if max_words_per_line > 0: words = text.split() lines = [' '.join(words[i:i+max_words_per_line]) for i in range(0, len(words), max_words_per_line)] text = '\\N'.join(lines) return text def srt_to_transcription_result(srt_content): """Convert SRT content into a transcription-like structure for uniform processing.""" subtitles = list(srt.parse(srt_content)) segments = [] for sub in subtitles: segments.append({ 'start': sub.start.total_seconds(), 'end': sub.end.total_seconds(), 'text': sub.content.strip(), 'words': [] # SRT does not provide word-level timestamps }) logger.info("Converted SRT content to transcription result.") return {'segments': segments} def split_lines(text, max_words_per_line): """Split text into multiple lines if max_words_per_line > 0.""" if max_words_per_line <= 0: return [text] words = text.split() lines = [' '.join(words[i:i+max_words_per_line]) for i in range(0, len(words), max_words_per_line)] return lines def is_url(string): """Check if the given string is a valid HTTP/HTTPS URL.""" try: result = urlparse(string) return result.scheme in ('http', 'https') except: return False def download_captions(captions_url): """Download captions from the given URL.""" try: logger.info(f"Downloading captions from URL: {captions_url}") response = requests.get(captions_url) response.raise_for_status() logger.info("Captions downloaded successfully.") return response.text except Exception as e: logger.error(f"Error downloading captions: {str(e)}") raise def determine_alignment_code(position_str, alignment_str, x, y, video_width, video_height): """ Determine the final \an alignment code and (x,y) position based on: - x,y (if provided) - position_str (one of top_left, top_center, ...) - alignment_str (left, center, right) - If x,y not provided, divide the video into a 3x3 grid and position accordingly. """ logger.info(f"[determine_alignment_code] Inputs: position_str={position_str}, alignment_str={alignment_str}, x={x}, y={y}, video_width={video_width}, video_height={video_height}") horizontal_map = { 'left': 1, 'center': 2, 'right': 3 } # If x and y are provided, use them directly and set \an based on alignment_str if x is not None and y is not None: logger.info("[determine_alignment_code] x and y provided, ignoring position and alignment for grid.") vertical_code = 4 # Middle row horiz_code = horizontal_map.get(alignment_str, 2) # Default to center an_code = vertical_code + (horiz_code - 1) logger.info(f"[determine_alignment_code] Using provided x,y. an_code={an_code}") return an_code, True, x, y # No x,y provided: determine position and alignment based on grid pos_lower = position_str.lower() if 'top' in pos_lower: vertical_base = 7 # Top row an codes start at 7 vertical_center = video_height / 6 elif 'middle' in pos_lower: vertical_base = 4 # Middle row an codes start at 4 vertical_center = video_height / 2 else: vertical_base = 1 # Bottom row an codes start at 1 vertical_center = (5 * video_height) / 6 if 'left' in pos_lower: left_boundary = 0 right_boundary = video_width / 3 center_line = video_width / 6 elif 'right' in pos_lower: left_boundary = (2 * video_width) / 3 right_boundary = video_width center_line = (5 * video_width) / 6 else: # Center column left_boundary = video_width / 3 right_boundary = (2 * video_width) / 3 center_line = video_width / 2 # Alignment affects horizontal position within the cell if alignment_str == 'left': final_x = left_boundary horiz_code = 1 elif alignment_str == 'right': final_x = right_boundary horiz_code = 3 else: final_x = center_line horiz_code = 2 final_y = vertical_center an_code = vertical_base + (horiz_code - 1) logger.info(f"[determine_alignment_code] Computed final_x={final_x}, final_y={final_y}, an_code={an_code}") return an_code, True, int(final_x), int(final_y) def create_style_line(style_options, video_resolution): """ Create the style line for ASS subtitles. """ font_family = style_options.get('font_family', 'Arial') available_fonts = get_available_fonts() if font_family not in available_fonts: logger.warning(f"Font '{font_family}' not found.") return {'error': f"Font '{font_family}' not available.", 'available_fonts': available_fonts} line_color = rgb_to_ass_color(style_options.get('line_color', '#FFFFFF')) secondary_color = line_color outline_color = rgb_to_ass_color(style_options.get('outline_color', '#000000')) box_color = rgb_to_ass_color(style_options.get('box_color', '#000000')) font_size = style_options.get('font_size', int(video_resolution[1] * 0.05)) bold = '1' if style_options.get('bold', False) else '0' italic = '1' if style_options.get('italic', False) else '0' underline = '1' if style_options.get('underline', False) else '0' strikeout = '1' if style_options.get('strikeout', False) else '0' scale_x = style_options.get('scale_x', '100') scale_y = style_options.get('scale_y', '100') spacing = style_options.get('spacing', '0') angle = style_options.get('angle', '0') border_style = style_options.get('border_style', '1') outline_width = style_options.get('outline_width', '2') shadow_offset = style_options.get('shadow_offset', '0') margin_l = style_options.get('margin_l', '20') margin_r = style_options.get('margin_r', '20') margin_v = style_options.get('margin_v', '20') # Default alignment in style (we override per event) alignment = 5 style_line = ( f"Style: Default,{font_family},{font_size},{line_color},{secondary_color}," f"{outline_color},{box_color},{bold},{italic},{underline},{strikeout}," f"{scale_x},{scale_y},{spacing},{angle},{border_style},{outline_width}," f"{shadow_offset},{alignment},{margin_l},{margin_r},{margin_v},0" ) logger.info(f"Created ASS style line: {style_line}") return style_line def generate_ass_header(style_options, video_resolution): """ Generate the ASS file header with the Default style. """ ass_header = f"""[Script Info] ScriptType: v4.00+ PlayResX: {video_resolution[0]} PlayResY: {video_resolution[1]} ScaledBorderAndShadow: yes [V4+ Styles] Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding """ style_line = create_style_line(style_options, video_resolution) if isinstance(style_line, dict) and 'error' in style_line: # Font-related error return style_line ass_header += style_line + "\n\n[Events]\nFormat: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text\n" logger.info("Generated ASS header.") return ass_header ### STYLE HANDLERS ### def handle_classic(transcription_result, style_options, replace_dict, video_resolution): """ Classic style handler: Centers the text based on position and alignment. """ max_words_per_line = int(style_options.get('max_words_per_line', 0)) all_caps = style_options.get('all_caps', False) if style_options['font_size'] is None: style_options['font_size'] = int(video_resolution[1] * 0.05) position_str = style_options.get('position', 'middle_center') alignment_str = style_options.get('alignment', 'center') x = style_options.get('x') y = style_options.get('y') an_code, use_pos, final_x, final_y = determine_alignment_code( position_str, alignment_str, x, y, video_width=video_resolution[0], video_height=video_resolution[1] ) logger.info(f"[Classic] position={position_str}, alignment={alignment_str}, x={final_x}, y={final_y}, an_code={an_code}") events = [] for segment in transcription_result['segments']: text = segment['text'].strip().replace('\n', ' ') lines = split_lines(text, max_words_per_line) processed_text = '\\N'.join(process_subtitle_text(line, replace_dict, all_caps, 0) for line in lines) start_time = format_ass_time(segment['start']) end_time = format_ass_time(segment['end']) position_tag = f"{{\\an{an_code}\\pos({final_x},{final_y})}}" events.append(f"Dialogue: 0,{start_time},{end_time},Default,,0,0,0,,{position_tag}{processed_text}") logger.info(f"Handled {len(events)} dialogues in classic style.") return "\n".join(events) def handle_karaoke(transcription_result, style_options, replace_dict, video_resolution): """ Karaoke style handler: Highlights words as they are spoken. """ max_words_per_line = int(style_options.get('max_words_per_line', 0)) all_caps = style_options.get('all_caps', False) if style_options['font_size'] is None: style_options['font_size'] = int(video_resolution[1] * 0.05) position_str = style_options.get('position', 'middle_center') alignment_str = style_options.get('alignment', 'center') x = style_options.get('x') y = style_options.get('y') an_code, use_pos, final_x, final_y = determine_alignment_code( position_str, alignment_str, x, y, video_width=video_resolution[0], video_height=video_resolution[1] ) word_color = rgb_to_ass_color(style_options.get('word_color', '#FFFF00')) logger.info(f"[Karaoke] position={position_str}, alignment={alignment_str}, x={final_x}, y={final_y}, an_code={an_code}") events = [] for segment in transcription_result['segments']: words = segment.get('words', []) if not words: continue if max_words_per_line > 0: lines_content = [] current_line = [] current_line_words = 0 for w_info in words: w = process_subtitle_text(w_info.get('word', ''), replace_dict, all_caps, 0) duration_cs = int(round((w_info['end'] - w_info['start']) * 100)) highlighted_word = f"{{\\k{duration_cs}}}{w} " current_line.append(highlighted_word) current_line_words += 1 if current_line_words >= max_words_per_line: lines_content.append(''.join(current_line).strip()) current_line = [] current_line_words = 0 if current_line: lines_content.append(''.join(current_line).strip()) else: line_content = [] for w_info in words: w = process_subtitle_text(w_info.get('word', ''), replace_dict, all_caps, 0) duration_cs = int(round((w_info['end'] - w_info['start']) * 100)) highlighted_word = f"{{\\k{duration_cs}}}{w} " line_content.append(highlighted_word) lines_content = [''.join(line_content).strip()] dialogue_text = '\\N'.join(lines_content) start_time = format_ass_time(words[0]['start']) end_time = format_ass_time(words[-1]['end']) position_tag = f"{{\\an{an_code}\\pos({final_x},{final_y})}}" events.append(f"Dialogue: 0,{start_time},{end_time},Default,,0,0,0,,{position_tag}{{\\c{word_color}}}{dialogue_text}") logger.info(f"Handled {len(events)} dialogues in karaoke style.") return "\n".join(events) def handle_highlight(transcription_result, style_options, replace_dict, video_resolution): """ Highlight style handler: Highlights words sequentially. """ max_words_per_line = int(style_options.get('max_words_per_line', 0)) all_caps = style_options.get('all_caps', False) if style_options['font_size'] is None: style_options['font_size'] = int(video_resolution[1] * 0.05) position_str = style_options.get('position', 'middle_center') alignment_str = style_options.get('alignment', 'center') x = style_options.get('x') y = style_options.get('y') an_code, use_pos, final_x, final_y = determine_alignment_code( position_str, alignment_str, x, y, video_width=video_resolution[0], video_height=video_resolution[1] ) word_color = rgb_to_ass_color(style_options.get('word_color', '#FFFF00')) line_color = rgb_to_ass_color(style_options.get('line_color', '#FFFFFF')) events = [] logger.info(f"[Highlight] position={position_str}, alignment={alignment_str}, x={final_x}, y={final_y}, an_code={an_code}") for segment in transcription_result['segments']: words = segment.get('words', []) if not words: continue # Process all words in the segment processed_words = [] for w_info in words: w = process_subtitle_text(w_info.get('word', ''), replace_dict, all_caps, 0) if w: processed_words.append((w, w_info['start'], w_info['end'])) if not processed_words: continue # Split into lines if max_words_per_line is specified if max_words_per_line > 0: line_sets = [processed_words[i:i+max_words_per_line] for i in range(0, len(processed_words), max_words_per_line)] else: line_sets = [processed_words] for line_set in line_sets: # Get the start time of the first word and end time of the last word line_start = line_set[0][1] line_end = line_set[-1][2] # Create a persistent line that stays visible during the entire segment base_text = ' '.join(word for word, _, _ in line_set) start_time = format_ass_time(line_start) end_time = format_ass_time(line_end) position_tag = f"{{\\an{an_code}\\pos({final_x},{final_y})}}" events.append(f"Dialogue: 0,{start_time},{end_time},Default,,0,0,0,,{position_tag}{{\\c{line_color}}}{base_text}") # Add individual highlighting for each word for idx, (word, w_start, w_end) in enumerate(line_set): # Create the highlighted version of this word within the line highlighted_words = [] for i, (w, _, _) in enumerate(line_set): if i == idx: # This is the current word - highlight it highlighted_words.append(f"{{\\c{word_color}}}{w}{{\\c{line_color}}}") else: # Add the word without highlighting highlighted_words.append(w) highlighted_text = ' '.join(highlighted_words) word_start_time = format_ass_time(w_start) word_end_time = format_ass_time(w_end) events.append(f"Dialogue: 1,{word_start_time},{word_end_time},Default,,0,0,0,,{position_tag}{{\\c{line_color}}}{highlighted_text}") logger.info(f"Handled {len(events)} dialogues in highlight style.") return "\n".join(events) def handle_underline(transcription_result, style_options, replace_dict, video_resolution): """ Underline style handler: Underlines the current word. """ max_words_per_line = int(style_options.get('max_words_per_line', 0)) all_caps = style_options.get('all_caps', False) if style_options['font_size'] is None: style_options['font_size'] = int(video_resolution[1] * 0.05) position_str = style_options.get('position', 'middle_center') alignment_str = style_options.get('alignment', 'center') x = style_options.get('x') y = style_options.get('y') an_code, use_pos, final_x, final_y = determine_alignment_code( position_str, alignment_str, x, y, video_width=video_resolution[0], video_height=video_resolution[1] ) line_color = rgb_to_ass_color(style_options.get('line_color', '#FFFFFF')) events = [] logger.info(f"[Underline] position={position_str}, alignment={alignment_str}, x={final_x}, y={final_y}, an_code={an_code}") for segment in transcription_result['segments']: words = segment.get('words', []) if not words: continue processed_words = [] for w_info in words: w = process_subtitle_text(w_info.get('word', ''), replace_dict, all_caps, 0) if w: processed_words.append((w, w_info['start'], w_info['end'])) if not processed_words: continue if max_words_per_line > 0: line_sets = [processed_words[i:i+max_words_per_line] for i in range(0, len(processed_words), max_words_per_line)] else: line_sets = [processed_words] for line_set in line_sets: for idx, (word, w_start, w_end) in enumerate(line_set): line_words = [] for w_idx, (w_text, _, _) in enumerate(line_set): if w_idx == idx: line_words.append(f"{{\\u1}}{w_text}{{\\u0}}") else: line_words.append(w_text) full_text = ' '.join(line_words) start_time = format_ass_time(w_start) end_time = format_ass_time(w_end) position_tag = f"{{\\an{an_code}\\pos({final_x},{final_y})}}" events.append(f"Dialogue: 0,{start_time},{end_time},Default,,0,0,0,,{position_tag}{{\\c{line_color}}}{full_text}") logger.info(f"Handled {len(events)} dialogues in underline style.") return "\n".join(events) def handle_word_by_word(transcription_result, style_options, replace_dict, video_resolution): """ Word-by-Word style handler: Displays each word individually. """ max_words_per_line = int(style_options.get('max_words_per_line', 0)) all_caps = style_options.get('all_caps', False) if style_options['font_size'] is None: style_options['font_size'] = int(video_resolution[1] * 0.05) position_str = style_options.get('position', 'middle_center') alignment_str = style_options.get('alignment', 'center') x = style_options.get('x') y = style_options.get('y') an_code, use_pos, final_x, final_y = determine_alignment_code( position_str, alignment_str, x, y, video_width=video_resolution[0], video_height=video_resolution[1] ) word_color = rgb_to_ass_color(style_options.get('word_color', '#FFFF00')) events = [] logger.info(f"[Word-by-Word] position={position_str}, alignment={alignment_str}, x={final_x}, y={final_y}, an_code={an_code}") for segment in transcription_result['segments']: words = segment.get('words', []) if not words: continue if max_words_per_line > 0: grouped_words = [words[i:i+max_words_per_line] for i in range(0, len(words), max_words_per_line)] else: grouped_words = [words] for word_group in grouped_words: for w_info in word_group: w = process_subtitle_text(w_info.get('word', ''), replace_dict, all_caps, 0) if not w: continue start_time = format_ass_time(w_info['start']) end_time = format_ass_time(w_info['end']) position_tag = f"{{\\an{an_code}\\pos({final_x},{final_y})}}" events.append(f"Dialogue: 0,{start_time},{end_time},Default,,0,0,0,,{position_tag}{{\\c{word_color}}}{w}") logger.info(f"Handled {len(events)} dialogues in word-by-word style.") return "\n".join(events) STYLE_HANDLERS = { 'classic': handle_classic, 'karaoke': handle_karaoke, 'highlight': handle_highlight, 'underline': handle_underline, 'word_by_word': handle_word_by_word } def srt_to_ass(transcription_result, style_type, settings, replace_dict, video_resolution): """ Convert transcription result to ASS based on the specified style. """ default_style_settings = { 'line_color': '#FFFFFF', 'word_color': '#FFFF00', 'box_color': '#000000', 'outline_color': '#000000', 'all_caps': False, 'max_words_per_line': 0, 'font_size': None, 'font_family': 'Arial', 'bold': False, 'italic': False, 'underline': False, 'strikeout': False, 'outline_width': 2, 'shadow_offset': 0, 'border_style': 1, 'x': None, 'y': None, 'position': 'middle_center', 'alignment': 'center' # default alignment } style_options = {**default_style_settings, **settings} if style_options['font_size'] is None: style_options['font_size'] = int(video_resolution[1] * 0.05) ass_header = generate_ass_header(style_options, video_resolution) if isinstance(ass_header, dict) and 'error' in ass_header: # Font-related error return ass_header handler = STYLE_HANDLERS.get(style_type.lower()) if not handler: logger.warning(f"Unknown style '{style_type}', defaulting to 'classic'.") handler = handle_classic dialogue_lines = handler(transcription_result, style_options, replace_dict, video_resolution) logger.info("Converted transcription result to ASS format.") return ass_header + dialogue_lines + "\n" def process_subtitle_events(transcription_result, style_type, settings, replace_dict, video_resolution): """ Process transcription results into ASS subtitle format. """ return srt_to_ass(transcription_result, style_type, settings, replace_dict, video_resolution) def parse_time_string(time_str): """Parse a time string in hh:mm:ss.ms or mm:ss.ms or ss.ms format to seconds (float).""" import re if not isinstance(time_str, str): raise ValueError("Time value must be a string in hh:mm:ss.ms format.") pattern = r"^(?:(\d+):)?(\d{1,2}):(\d{2}(?:\.\d{1,3})?)$" match = re.match(pattern, time_str) if not match: # Try ss.ms only try: return float(time_str) except Exception: raise ValueError(f"Invalid time string: {time_str}") h, m, s = match.groups(default="0") total_seconds = int(h) * 3600 + int(m) * 60 + float(s) return total_seconds def filter_subtitle_lines(sub_content, exclude_time_ranges, subtitle_type): """ Remove subtitle lines/blocks that overlap with exclude_time_ranges. Supports 'ass' and 'srt' subtitle_type. """ def parse_ass_time(ass_time): try: h, m, rest = ass_time.split(":") s, cs = rest.split(".") return int(h) * 3600 + int(m) * 60 + int(s) + int(cs) / 100 except Exception: return 0 def parse_time_range(rng): start = parse_time_string(rng['start']) end = parse_time_string(rng['end']) return {'start': start, 'end': end} parsed_ranges = [parse_time_range(rng) for rng in exclude_time_ranges] if not exclude_time_ranges: return sub_content if subtitle_type == 'ass': lines = sub_content.splitlines() filtered_lines = [] for line in lines: if line.startswith("Dialogue:"): parts = line.split(",", 10) if len(parts) > 3: start = parse_ass_time(parts[1]) end = parse_ass_time(parts[2]) overlap = False for rng in parsed_ranges: if start < rng['end'] and end > rng['start']: overlap = True break if overlap: continue filtered_lines.append(line) return "\n".join(filtered_lines) elif subtitle_type == 'srt': subtitles = list(srt.parse(sub_content)) filtered = [] for sub in subtitles: start = sub.start.total_seconds() end = sub.end.total_seconds() overlap = False for rng in parsed_ranges: if start < rng['end'] and end > rng['start']: overlap = True break if not overlap: filtered.append(sub) return srt.compose(filtered) else: return sub_content def normalize_exclude_time_ranges(exclude_time_ranges): norm = [] for rng in exclude_time_ranges: start = rng.get("start") end = rng.get("end") if not isinstance(start, str) or not isinstance(end, str): raise ValueError("exclude_time_ranges start/end must be strings in hh:mm:ss.ms format.") start_sec = parse_time_string(start) end_sec = parse_time_string(end) if start_sec < 0 or end_sec < 0: raise ValueError("exclude_time_ranges start/end must be non-negative.") if end_sec <= start_sec: raise ValueError("exclude_time_ranges end must be strictly greater than start.") norm.append({"start": start, "end": end}) return norm def generate_ass_captions_v1(video_url, captions, settings, replace, exclude_time_ranges, job_id, language='auto', PlayResX=None, PlayResY=None): """ Captioning process with transcription fallback and multiple styles. Integrates with the updated logic for positioning and alignment. If PlayResX and PlayResY are provided, use them for ASS generation; otherwise, get from video. """ try: # Normalize exclude_time_ranges to ensure start/end are floats if exclude_time_ranges: exclude_time_ranges = normalize_exclude_time_ranges(exclude_time_ranges) if not isinstance(settings, dict): logger.error(f"Job {job_id}: 'settings' should be a dictionary.") return {"error": "'settings' should be a dictionary."} # Normalize keys by replacing hyphens with underscores style_options = {k.replace('-', '_'): v for k, v in settings.items()} if not isinstance(replace, list): logger.error(f"Job {job_id}: 'replace' should be a list of objects with 'find' and 'replace' keys.") return {"error": "'replace' should be a list of objects with 'find' and 'replace' keys."} # Convert 'replace' list to dictionary replace_dict = {} for item in replace: if 'find' in item and 'replace' in item: replace_dict[item['find']] = item['replace'] else: logger.warning(f"Job {job_id}: Invalid replace item {item}. Skipping.") # Handle deprecated 'highlight_color' by merging it into 'word_color' if 'highlight_color' in style_options: logger.warning(f"Job {job_id}: 'highlight_color' is deprecated; merging into 'word_color'.") style_options['word_color'] = style_options.pop('highlight_color') # Check font availability font_family = style_options.get('font_family', 'Arial') available_fonts = get_available_fonts() if font_family not in available_fonts: logger.warning(f"Job {job_id}: Font '{font_family}' not found.") # Return font error with available_fonts return {"error": f"Font '{font_family}' not available.", "available_fonts": available_fonts} logger.info(f"Job {job_id}: Font '{font_family}' is available.") # Determine if captions is a URL or raw content if captions and is_url(captions): logger.info(f"Job {job_id}: Captions provided as URL. Downloading captions.") try: captions_content = download_captions(captions) except Exception as e: logger.error(f"Job {job_id}: Failed to download captions: {str(e)}") return {"error": f"Failed to download captions: {str(e)}"} elif captions: logger.info(f"Job {job_id}: Captions provided as raw content.") captions_content = captions else: captions_content = None # Download the video try: video_path = download_file(video_url, LOCAL_STORAGE_PATH) logger.info(f"Job {job_id}: Video downloaded to {video_path}") except Exception as e: logger.error(f"Job {job_id}: Video download error: {str(e)}") # For non-font errors, do NOT include available_fonts return {"error": str(e)} # Get video resolution, unless provided if PlayResX is not None and PlayResY is not None: video_resolution = (PlayResX, PlayResY) logger.info(f"Job {job_id}: Using provided PlayResX/PlayResY = {PlayResX}x{PlayResY}") else: video_resolution = get_video_resolution(video_path) logger.info(f"Job {job_id}: Video resolution detected = {video_resolution[0]}x{video_resolution[1]}") # Determine style type style_type = style_options.get('style', 'classic').lower() logger.info(f"Job {job_id}: Using style '{style_type}' for captioning.") # Determine subtitle content if captions_content: # Check if it's ASS by looking for '[Script Info]' if '[Script Info]' in captions_content: # It's ASS directly subtitle_content = captions_content subtitle_type = 'ass' logger.info(f"Job {job_id}: Detected ASS formatted captions.") else: # Treat as SRT logger.info(f"Job {job_id}: Detected SRT formatted captions.") # Validate style for SRT if style_type != 'classic': error_message = "Only 'classic' style is supported for SRT captions." logger.error(f"Job {job_id}: {error_message}") return {"error": error_message} transcription_result = srt_to_transcription_result(captions_content) # Generate ASS based on chosen style subtitle_content = process_subtitle_events(transcription_result, style_type, style_options, replace_dict, video_resolution) subtitle_type = 'ass' else: # No captions provided, generate transcription logger.info(f"Job {job_id}: No captions provided, generating transcription.") transcription_result = generate_transcription(video_path, language=language) # Generate ASS based on chosen style subtitle_content = process_subtitle_events(transcription_result, style_type, style_options, replace_dict, video_resolution) subtitle_type = 'ass' # Check for subtitle processing errors if isinstance(subtitle_content, dict) and 'error' in subtitle_content: logger.error(f"Job {job_id}: {subtitle_content['error']}") # Only include 'available_fonts' if it's a font-related error if 'available_fonts' in subtitle_content: return {"error": subtitle_content['error'], "available_fonts": subtitle_content.get('available_fonts', [])} else: return {"error": subtitle_content['error']} # After subtitle_content is generated and before saving to file: if exclude_time_ranges: subtitle_content = filter_subtitle_lines(subtitle_content, exclude_time_ranges, subtitle_type) if subtitle_type == 'ass': logger.info(f"Job {job_id}: Filtered ASS Dialogue lines due to exclude_time_ranges.") elif subtitle_type == 'srt': logger.info(f"Job {job_id}: Filtered SRT subtitle blocks due to exclude_time_ranges.") # Save the subtitle content subtitle_filename = f"{job_id}.{subtitle_type}" subtitle_path = os.path.join(LOCAL_STORAGE_PATH, subtitle_filename) try: with open(subtitle_path, 'w', encoding='utf-8') as f: f.write(subtitle_content) logger.info(f"Job {job_id}: Subtitle file saved to {subtitle_path}") except Exception as e: logger.error(f"Job {job_id}: Failed to save subtitle file: {str(e)}") return {"error": f"Failed to save subtitle file: {str(e)}"} return subtitle_path except Exception as e: logger.error(f"Job {job_id}: Error in generate_ass_captions_v1: {str(e)}", exc_info=True) return {"error": str(e)}