nca-toolkit / services /ass_toolkit.py
jananathbanuka
fix issues
4b12e15
# Copyright (c) 2025 Stephen G. Pope
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import os
import ffmpeg
import logging
import subprocess
import whisper
from datetime import timedelta
import srt
import re
from services.file_management import download_file
from services.cloud_storage import upload_file # Ensure this import is present
import requests # Ensure requests is imported for webhook handling
from urllib.parse import urlparse
from config import LOCAL_STORAGE_PATH
# Initialize logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
if not logger.hasHandlers():
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
POSITION_ALIGNMENT_MAP = {
"bottom_left": 1,
"bottom_center": 2,
"bottom_right": 3,
"middle_left": 4,
"middle_center": 5,
"middle_right": 6,
"top_left": 7,
"top_center": 8,
"top_right": 9
}
def rgb_to_ass_color(rgb_color):
"""Convert RGB hex to ASS (&HAABBGGRR)."""
if isinstance(rgb_color, str):
rgb_color = rgb_color.lstrip('#')
if len(rgb_color) == 6:
r = int(rgb_color[0:2], 16)
g = int(rgb_color[2:4], 16)
b = int(rgb_color[4:6], 16)
return f"&H00{b:02X}{g:02X}{r:02X}"
return "&H00FFFFFF"
def generate_transcription(video_path, language='auto'):
try:
model = whisper.load_model("base")
transcription_options = {
'word_timestamps': True,
'verbose': True,
}
if language != 'auto':
transcription_options['language'] = language
result = model.transcribe(video_path, **transcription_options)
logger.info(f"Transcription generated successfully for video: {video_path}")
return result
except Exception as e:
logger.error(f"Error in transcription: {str(e)}")
raise
def get_video_resolution(video_path):
try:
probe = ffmpeg.probe(video_path)
video_streams = [s for s in probe['streams'] if s['codec_type'] == 'video']
if video_streams:
width = int(video_streams[0]['width'])
height = int(video_streams[0]['height'])
logger.info(f"Video resolution determined: {width}x{height}")
return width, height
else:
logger.warning(f"No video streams found for {video_path}. Using default resolution 384x288.")
return 384, 288
except Exception as e:
logger.error(f"Error getting video resolution: {str(e)}. Using default resolution 384x288.")
return 384, 288
def get_available_fonts():
"""Get the list of available fonts on the system."""
try:
import matplotlib.font_manager as fm
except ImportError:
logger.error("matplotlib not installed. Install via 'pip install matplotlib'.")
return []
font_list = fm.findSystemFonts(fontpaths=None, fontext='ttf')
font_names = set()
for font in font_list:
try:
font_prop = fm.FontProperties(fname=font)
font_name = font_prop.get_name()
font_names.add(font_name)
except Exception:
continue
logger.info(f"Available fonts retrieved: {font_names}")
return list(font_names)
def format_ass_time(seconds):
"""Convert float seconds to ASS time format H:MM:SS.cc"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
centiseconds = int(round((seconds - int(seconds)) * 100))
return f"{hours}:{minutes:02}:{secs:02}.{centiseconds:02}"
def process_subtitle_text(text, replace_dict, all_caps, max_words_per_line):
"""Apply text transformations: replacements, all caps, and optional line splitting."""
for old_word, new_word in replace_dict.items():
text = re.sub(re.escape(old_word), new_word, text, flags=re.IGNORECASE)
if all_caps:
text = text.upper()
if max_words_per_line > 0:
words = text.split()
lines = [' '.join(words[i:i+max_words_per_line]) for i in range(0, len(words), max_words_per_line)]
text = '\\N'.join(lines)
return text
def srt_to_transcription_result(srt_content):
"""Convert SRT content into a transcription-like structure for uniform processing."""
subtitles = list(srt.parse(srt_content))
segments = []
for sub in subtitles:
segments.append({
'start': sub.start.total_seconds(),
'end': sub.end.total_seconds(),
'text': sub.content.strip(),
'words': [] # SRT does not provide word-level timestamps
})
logger.info("Converted SRT content to transcription result.")
return {'segments': segments}
def split_lines(text, max_words_per_line):
"""Split text into multiple lines if max_words_per_line > 0."""
if max_words_per_line <= 0:
return [text]
words = text.split()
lines = [' '.join(words[i:i+max_words_per_line]) for i in range(0, len(words), max_words_per_line)]
return lines
def is_url(string):
"""Check if the given string is a valid HTTP/HTTPS URL."""
try:
result = urlparse(string)
return result.scheme in ('http', 'https')
except:
return False
def download_captions(captions_url):
"""Download captions from the given URL."""
try:
logger.info(f"Downloading captions from URL: {captions_url}")
response = requests.get(captions_url)
response.raise_for_status()
logger.info("Captions downloaded successfully.")
return response.text
except Exception as e:
logger.error(f"Error downloading captions: {str(e)}")
raise
def determine_alignment_code(position_str, alignment_str, x, y, video_width, video_height):
"""
Determine the final \an alignment code and (x,y) position based on:
- x,y (if provided)
- position_str (one of top_left, top_center, ...)
- alignment_str (left, center, right)
- If x,y not provided, divide the video into a 3x3 grid and position accordingly.
"""
logger.info(f"[determine_alignment_code] Inputs: position_str={position_str}, alignment_str={alignment_str}, x={x}, y={y}, video_width={video_width}, video_height={video_height}")
horizontal_map = {
'left': 1,
'center': 2,
'right': 3
}
# If x and y are provided, use them directly and set \an based on alignment_str
if x is not None and y is not None:
logger.info("[determine_alignment_code] x and y provided, ignoring position and alignment for grid.")
vertical_code = 4 # Middle row
horiz_code = horizontal_map.get(alignment_str, 2) # Default to center
an_code = vertical_code + (horiz_code - 1)
logger.info(f"[determine_alignment_code] Using provided x,y. an_code={an_code}")
return an_code, True, x, y
# No x,y provided: determine position and alignment based on grid
pos_lower = position_str.lower()
if 'top' in pos_lower:
vertical_base = 7 # Top row an codes start at 7
vertical_center = video_height / 6
elif 'middle' in pos_lower:
vertical_base = 4 # Middle row an codes start at 4
vertical_center = video_height / 2
else:
vertical_base = 1 # Bottom row an codes start at 1
vertical_center = (5 * video_height) / 6
if 'left' in pos_lower:
left_boundary = 0
right_boundary = video_width / 3
center_line = video_width / 6
elif 'right' in pos_lower:
left_boundary = (2 * video_width) / 3
right_boundary = video_width
center_line = (5 * video_width) / 6
else:
# Center column
left_boundary = video_width / 3
right_boundary = (2 * video_width) / 3
center_line = video_width / 2
# Alignment affects horizontal position within the cell
if alignment_str == 'left':
final_x = left_boundary
horiz_code = 1
elif alignment_str == 'right':
final_x = right_boundary
horiz_code = 3
else:
final_x = center_line
horiz_code = 2
final_y = vertical_center
an_code = vertical_base + (horiz_code - 1)
logger.info(f"[determine_alignment_code] Computed final_x={final_x}, final_y={final_y}, an_code={an_code}")
return an_code, True, int(final_x), int(final_y)
def create_style_line(style_options, video_resolution):
"""
Create the style line for ASS subtitles.
"""
font_family = style_options.get('font_family', 'Arial')
available_fonts = get_available_fonts()
if font_family not in available_fonts:
logger.warning(f"Font '{font_family}' not found.")
return {'error': f"Font '{font_family}' not available.", 'available_fonts': available_fonts}
line_color = rgb_to_ass_color(style_options.get('line_color', '#FFFFFF'))
secondary_color = line_color
outline_color = rgb_to_ass_color(style_options.get('outline_color', '#000000'))
box_color = rgb_to_ass_color(style_options.get('box_color', '#000000'))
font_size = style_options.get('font_size', int(video_resolution[1] * 0.05))
bold = '1' if style_options.get('bold', False) else '0'
italic = '1' if style_options.get('italic', False) else '0'
underline = '1' if style_options.get('underline', False) else '0'
strikeout = '1' if style_options.get('strikeout', False) else '0'
scale_x = style_options.get('scale_x', '100')
scale_y = style_options.get('scale_y', '100')
spacing = style_options.get('spacing', '0')
angle = style_options.get('angle', '0')
border_style = style_options.get('border_style', '1')
outline_width = style_options.get('outline_width', '2')
shadow_offset = style_options.get('shadow_offset', '0')
margin_l = style_options.get('margin_l', '20')
margin_r = style_options.get('margin_r', '20')
margin_v = style_options.get('margin_v', '20')
# Default alignment in style (we override per event)
alignment = 5
style_line = (
f"Style: Default,{font_family},{font_size},{line_color},{secondary_color},"
f"{outline_color},{box_color},{bold},{italic},{underline},{strikeout},"
f"{scale_x},{scale_y},{spacing},{angle},{border_style},{outline_width},"
f"{shadow_offset},{alignment},{margin_l},{margin_r},{margin_v},0"
)
logger.info(f"Created ASS style line: {style_line}")
return style_line
def generate_ass_header(style_options, video_resolution):
"""
Generate the ASS file header with the Default style.
"""
ass_header = f"""[Script Info]
ScriptType: v4.00+
PlayResX: {video_resolution[0]}
PlayResY: {video_resolution[1]}
ScaledBorderAndShadow: yes
[V4+ Styles]
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
"""
style_line = create_style_line(style_options, video_resolution)
if isinstance(style_line, dict) and 'error' in style_line:
# Font-related error
return style_line
ass_header += style_line + "\n\n[Events]\nFormat: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text\n"
logger.info("Generated ASS header.")
return ass_header
### STYLE HANDLERS ###
def handle_classic(transcription_result, style_options, replace_dict, video_resolution):
"""
Classic style handler: Centers the text based on position and alignment.
"""
max_words_per_line = int(style_options.get('max_words_per_line', 0))
all_caps = style_options.get('all_caps', False)
if style_options['font_size'] is None:
style_options['font_size'] = int(video_resolution[1] * 0.05)
position_str = style_options.get('position', 'middle_center')
alignment_str = style_options.get('alignment', 'center')
x = style_options.get('x')
y = style_options.get('y')
an_code, use_pos, final_x, final_y = determine_alignment_code(
position_str, alignment_str, x, y,
video_width=video_resolution[0],
video_height=video_resolution[1]
)
logger.info(f"[Classic] position={position_str}, alignment={alignment_str}, x={final_x}, y={final_y}, an_code={an_code}")
events = []
for segment in transcription_result['segments']:
text = segment['text'].strip().replace('\n', ' ')
lines = split_lines(text, max_words_per_line)
processed_text = '\\N'.join(process_subtitle_text(line, replace_dict, all_caps, 0) for line in lines)
start_time = format_ass_time(segment['start'])
end_time = format_ass_time(segment['end'])
position_tag = f"{{\\an{an_code}\\pos({final_x},{final_y})}}"
events.append(f"Dialogue: 0,{start_time},{end_time},Default,,0,0,0,,{position_tag}{processed_text}")
logger.info(f"Handled {len(events)} dialogues in classic style.")
return "\n".join(events)
def handle_karaoke(transcription_result, style_options, replace_dict, video_resolution):
"""
Karaoke style handler: Highlights words as they are spoken.
"""
max_words_per_line = int(style_options.get('max_words_per_line', 0))
all_caps = style_options.get('all_caps', False)
if style_options['font_size'] is None:
style_options['font_size'] = int(video_resolution[1] * 0.05)
position_str = style_options.get('position', 'middle_center')
alignment_str = style_options.get('alignment', 'center')
x = style_options.get('x')
y = style_options.get('y')
an_code, use_pos, final_x, final_y = determine_alignment_code(
position_str, alignment_str, x, y,
video_width=video_resolution[0],
video_height=video_resolution[1]
)
word_color = rgb_to_ass_color(style_options.get('word_color', '#FFFF00'))
logger.info(f"[Karaoke] position={position_str}, alignment={alignment_str}, x={final_x}, y={final_y}, an_code={an_code}")
events = []
for segment in transcription_result['segments']:
words = segment.get('words', [])
if not words:
continue
if max_words_per_line > 0:
lines_content = []
current_line = []
current_line_words = 0
for w_info in words:
w = process_subtitle_text(w_info.get('word', ''), replace_dict, all_caps, 0)
duration_cs = int(round((w_info['end'] - w_info['start']) * 100))
highlighted_word = f"{{\\k{duration_cs}}}{w} "
current_line.append(highlighted_word)
current_line_words += 1
if current_line_words >= max_words_per_line:
lines_content.append(''.join(current_line).strip())
current_line = []
current_line_words = 0
if current_line:
lines_content.append(''.join(current_line).strip())
else:
line_content = []
for w_info in words:
w = process_subtitle_text(w_info.get('word', ''), replace_dict, all_caps, 0)
duration_cs = int(round((w_info['end'] - w_info['start']) * 100))
highlighted_word = f"{{\\k{duration_cs}}}{w} "
line_content.append(highlighted_word)
lines_content = [''.join(line_content).strip()]
dialogue_text = '\\N'.join(lines_content)
start_time = format_ass_time(words[0]['start'])
end_time = format_ass_time(words[-1]['end'])
position_tag = f"{{\\an{an_code}\\pos({final_x},{final_y})}}"
events.append(f"Dialogue: 0,{start_time},{end_time},Default,,0,0,0,,{position_tag}{{\\c{word_color}}}{dialogue_text}")
logger.info(f"Handled {len(events)} dialogues in karaoke style.")
return "\n".join(events)
def handle_highlight(transcription_result, style_options, replace_dict, video_resolution):
"""
Highlight style handler: Highlights words sequentially.
"""
max_words_per_line = int(style_options.get('max_words_per_line', 0))
all_caps = style_options.get('all_caps', False)
if style_options['font_size'] is None:
style_options['font_size'] = int(video_resolution[1] * 0.05)
position_str = style_options.get('position', 'middle_center')
alignment_str = style_options.get('alignment', 'center')
x = style_options.get('x')
y = style_options.get('y')
an_code, use_pos, final_x, final_y = determine_alignment_code(
position_str, alignment_str, x, y,
video_width=video_resolution[0],
video_height=video_resolution[1]
)
word_color = rgb_to_ass_color(style_options.get('word_color', '#FFFF00'))
line_color = rgb_to_ass_color(style_options.get('line_color', '#FFFFFF'))
events = []
logger.info(f"[Highlight] position={position_str}, alignment={alignment_str}, x={final_x}, y={final_y}, an_code={an_code}")
for segment in transcription_result['segments']:
words = segment.get('words', [])
if not words:
continue
# Process all words in the segment
processed_words = []
for w_info in words:
w = process_subtitle_text(w_info.get('word', ''), replace_dict, all_caps, 0)
if w:
processed_words.append((w, w_info['start'], w_info['end']))
if not processed_words:
continue
# Split into lines if max_words_per_line is specified
if max_words_per_line > 0:
line_sets = [processed_words[i:i+max_words_per_line] for i in range(0, len(processed_words), max_words_per_line)]
else:
line_sets = [processed_words]
for line_set in line_sets:
# Get the start time of the first word and end time of the last word
line_start = line_set[0][1]
line_end = line_set[-1][2]
# Create a persistent line that stays visible during the entire segment
base_text = ' '.join(word for word, _, _ in line_set)
start_time = format_ass_time(line_start)
end_time = format_ass_time(line_end)
position_tag = f"{{\\an{an_code}\\pos({final_x},{final_y})}}"
events.append(f"Dialogue: 0,{start_time},{end_time},Default,,0,0,0,,{position_tag}{{\\c{line_color}}}{base_text}")
# Add individual highlighting for each word
for idx, (word, w_start, w_end) in enumerate(line_set):
# Create the highlighted version of this word within the line
highlighted_words = []
for i, (w, _, _) in enumerate(line_set):
if i == idx:
# This is the current word - highlight it
highlighted_words.append(f"{{\\c{word_color}}}{w}{{\\c{line_color}}}")
else:
# Add the word without highlighting
highlighted_words.append(w)
highlighted_text = ' '.join(highlighted_words)
word_start_time = format_ass_time(w_start)
word_end_time = format_ass_time(w_end)
events.append(f"Dialogue: 1,{word_start_time},{word_end_time},Default,,0,0,0,,{position_tag}{{\\c{line_color}}}{highlighted_text}")
logger.info(f"Handled {len(events)} dialogues in highlight style.")
return "\n".join(events)
def handle_underline(transcription_result, style_options, replace_dict, video_resolution):
"""
Underline style handler: Underlines the current word.
"""
max_words_per_line = int(style_options.get('max_words_per_line', 0))
all_caps = style_options.get('all_caps', False)
if style_options['font_size'] is None:
style_options['font_size'] = int(video_resolution[1] * 0.05)
position_str = style_options.get('position', 'middle_center')
alignment_str = style_options.get('alignment', 'center')
x = style_options.get('x')
y = style_options.get('y')
an_code, use_pos, final_x, final_y = determine_alignment_code(
position_str, alignment_str, x, y,
video_width=video_resolution[0],
video_height=video_resolution[1]
)
line_color = rgb_to_ass_color(style_options.get('line_color', '#FFFFFF'))
events = []
logger.info(f"[Underline] position={position_str}, alignment={alignment_str}, x={final_x}, y={final_y}, an_code={an_code}")
for segment in transcription_result['segments']:
words = segment.get('words', [])
if not words:
continue
processed_words = []
for w_info in words:
w = process_subtitle_text(w_info.get('word', ''), replace_dict, all_caps, 0)
if w:
processed_words.append((w, w_info['start'], w_info['end']))
if not processed_words:
continue
if max_words_per_line > 0:
line_sets = [processed_words[i:i+max_words_per_line] for i in range(0, len(processed_words), max_words_per_line)]
else:
line_sets = [processed_words]
for line_set in line_sets:
for idx, (word, w_start, w_end) in enumerate(line_set):
line_words = []
for w_idx, (w_text, _, _) in enumerate(line_set):
if w_idx == idx:
line_words.append(f"{{\\u1}}{w_text}{{\\u0}}")
else:
line_words.append(w_text)
full_text = ' '.join(line_words)
start_time = format_ass_time(w_start)
end_time = format_ass_time(w_end)
position_tag = f"{{\\an{an_code}\\pos({final_x},{final_y})}}"
events.append(f"Dialogue: 0,{start_time},{end_time},Default,,0,0,0,,{position_tag}{{\\c{line_color}}}{full_text}")
logger.info(f"Handled {len(events)} dialogues in underline style.")
return "\n".join(events)
def handle_word_by_word(transcription_result, style_options, replace_dict, video_resolution):
"""
Word-by-Word style handler: Displays each word individually.
"""
max_words_per_line = int(style_options.get('max_words_per_line', 0))
all_caps = style_options.get('all_caps', False)
if style_options['font_size'] is None:
style_options['font_size'] = int(video_resolution[1] * 0.05)
position_str = style_options.get('position', 'middle_center')
alignment_str = style_options.get('alignment', 'center')
x = style_options.get('x')
y = style_options.get('y')
an_code, use_pos, final_x, final_y = determine_alignment_code(
position_str, alignment_str, x, y,
video_width=video_resolution[0],
video_height=video_resolution[1]
)
word_color = rgb_to_ass_color(style_options.get('word_color', '#FFFF00'))
events = []
logger.info(f"[Word-by-Word] position={position_str}, alignment={alignment_str}, x={final_x}, y={final_y}, an_code={an_code}")
for segment in transcription_result['segments']:
words = segment.get('words', [])
if not words:
continue
if max_words_per_line > 0:
grouped_words = [words[i:i+max_words_per_line] for i in range(0, len(words), max_words_per_line)]
else:
grouped_words = [words]
for word_group in grouped_words:
for w_info in word_group:
w = process_subtitle_text(w_info.get('word', ''), replace_dict, all_caps, 0)
if not w:
continue
start_time = format_ass_time(w_info['start'])
end_time = format_ass_time(w_info['end'])
position_tag = f"{{\\an{an_code}\\pos({final_x},{final_y})}}"
events.append(f"Dialogue: 0,{start_time},{end_time},Default,,0,0,0,,{position_tag}{{\\c{word_color}}}{w}")
logger.info(f"Handled {len(events)} dialogues in word-by-word style.")
return "\n".join(events)
STYLE_HANDLERS = {
'classic': handle_classic,
'karaoke': handle_karaoke,
'highlight': handle_highlight,
'underline': handle_underline,
'word_by_word': handle_word_by_word
}
def srt_to_ass(transcription_result, style_type, settings, replace_dict, video_resolution):
"""
Convert transcription result to ASS based on the specified style.
"""
default_style_settings = {
'line_color': '#FFFFFF',
'word_color': '#FFFF00',
'box_color': '#000000',
'outline_color': '#000000',
'all_caps': False,
'max_words_per_line': 0,
'font_size': None,
'font_family': 'Arial',
'bold': False,
'italic': False,
'underline': False,
'strikeout': False,
'outline_width': 2,
'shadow_offset': 0,
'border_style': 1,
'x': None,
'y': None,
'position': 'middle_center',
'alignment': 'center' # default alignment
}
style_options = {**default_style_settings, **settings}
if style_options['font_size'] is None:
style_options['font_size'] = int(video_resolution[1] * 0.05)
ass_header = generate_ass_header(style_options, video_resolution)
if isinstance(ass_header, dict) and 'error' in ass_header:
# Font-related error
return ass_header
handler = STYLE_HANDLERS.get(style_type.lower())
if not handler:
logger.warning(f"Unknown style '{style_type}', defaulting to 'classic'.")
handler = handle_classic
dialogue_lines = handler(transcription_result, style_options, replace_dict, video_resolution)
logger.info("Converted transcription result to ASS format.")
return ass_header + dialogue_lines + "\n"
def process_subtitle_events(transcription_result, style_type, settings, replace_dict, video_resolution):
"""
Process transcription results into ASS subtitle format.
"""
return srt_to_ass(transcription_result, style_type, settings, replace_dict, video_resolution)
def parse_time_string(time_str):
"""Parse a time string in hh:mm:ss.ms or mm:ss.ms or ss.ms format to seconds (float)."""
import re
if not isinstance(time_str, str):
raise ValueError("Time value must be a string in hh:mm:ss.ms format.")
pattern = r"^(?:(\d+):)?(\d{1,2}):(\d{2}(?:\.\d{1,3})?)$"
match = re.match(pattern, time_str)
if not match:
# Try ss.ms only
try:
return float(time_str)
except Exception:
raise ValueError(f"Invalid time string: {time_str}")
h, m, s = match.groups(default="0")
total_seconds = int(h) * 3600 + int(m) * 60 + float(s)
return total_seconds
def filter_subtitle_lines(sub_content, exclude_time_ranges, subtitle_type):
"""
Remove subtitle lines/blocks that overlap with exclude_time_ranges.
Supports 'ass' and 'srt' subtitle_type.
"""
def parse_ass_time(ass_time):
try:
h, m, rest = ass_time.split(":")
s, cs = rest.split(".")
return int(h) * 3600 + int(m) * 60 + int(s) + int(cs) / 100
except Exception:
return 0
def parse_time_range(rng):
start = parse_time_string(rng['start'])
end = parse_time_string(rng['end'])
return {'start': start, 'end': end}
parsed_ranges = [parse_time_range(rng) for rng in exclude_time_ranges]
if not exclude_time_ranges:
return sub_content
if subtitle_type == 'ass':
lines = sub_content.splitlines()
filtered_lines = []
for line in lines:
if line.startswith("Dialogue:"):
parts = line.split(",", 10)
if len(parts) > 3:
start = parse_ass_time(parts[1])
end = parse_ass_time(parts[2])
overlap = False
for rng in parsed_ranges:
if start < rng['end'] and end > rng['start']:
overlap = True
break
if overlap:
continue
filtered_lines.append(line)
return "\n".join(filtered_lines)
elif subtitle_type == 'srt':
subtitles = list(srt.parse(sub_content))
filtered = []
for sub in subtitles:
start = sub.start.total_seconds()
end = sub.end.total_seconds()
overlap = False
for rng in parsed_ranges:
if start < rng['end'] and end > rng['start']:
overlap = True
break
if not overlap:
filtered.append(sub)
return srt.compose(filtered)
else:
return sub_content
def normalize_exclude_time_ranges(exclude_time_ranges):
norm = []
for rng in exclude_time_ranges:
start = rng.get("start")
end = rng.get("end")
if not isinstance(start, str) or not isinstance(end, str):
raise ValueError("exclude_time_ranges start/end must be strings in hh:mm:ss.ms format.")
start_sec = parse_time_string(start)
end_sec = parse_time_string(end)
if start_sec < 0 or end_sec < 0:
raise ValueError("exclude_time_ranges start/end must be non-negative.")
if end_sec <= start_sec:
raise ValueError("exclude_time_ranges end must be strictly greater than start.")
norm.append({"start": start, "end": end})
return norm
def generate_ass_captions_v1(video_url, captions, settings, replace, exclude_time_ranges, job_id, language='auto', PlayResX=None, PlayResY=None):
"""
Captioning process with transcription fallback and multiple styles.
Integrates with the updated logic for positioning and alignment.
If PlayResX and PlayResY are provided, use them for ASS generation; otherwise, get from video.
"""
try:
# Normalize exclude_time_ranges to ensure start/end are floats
if exclude_time_ranges:
exclude_time_ranges = normalize_exclude_time_ranges(exclude_time_ranges)
if not isinstance(settings, dict):
logger.error(f"Job {job_id}: 'settings' should be a dictionary.")
return {"error": "'settings' should be a dictionary."}
# Normalize keys by replacing hyphens with underscores
style_options = {k.replace('-', '_'): v for k, v in settings.items()}
if not isinstance(replace, list):
logger.error(f"Job {job_id}: 'replace' should be a list of objects with 'find' and 'replace' keys.")
return {"error": "'replace' should be a list of objects with 'find' and 'replace' keys."}
# Convert 'replace' list to dictionary
replace_dict = {}
for item in replace:
if 'find' in item and 'replace' in item:
replace_dict[item['find']] = item['replace']
else:
logger.warning(f"Job {job_id}: Invalid replace item {item}. Skipping.")
# Handle deprecated 'highlight_color' by merging it into 'word_color'
if 'highlight_color' in style_options:
logger.warning(f"Job {job_id}: 'highlight_color' is deprecated; merging into 'word_color'.")
style_options['word_color'] = style_options.pop('highlight_color')
# Check font availability
font_family = style_options.get('font_family', 'Arial')
available_fonts = get_available_fonts()
if font_family not in available_fonts:
logger.warning(f"Job {job_id}: Font '{font_family}' not found.")
# Return font error with available_fonts
return {"error": f"Font '{font_family}' not available.", "available_fonts": available_fonts}
logger.info(f"Job {job_id}: Font '{font_family}' is available.")
# Determine if captions is a URL or raw content
if captions and is_url(captions):
logger.info(f"Job {job_id}: Captions provided as URL. Downloading captions.")
try:
captions_content = download_captions(captions)
except Exception as e:
logger.error(f"Job {job_id}: Failed to download captions: {str(e)}")
return {"error": f"Failed to download captions: {str(e)}"}
elif captions:
logger.info(f"Job {job_id}: Captions provided as raw content.")
captions_content = captions
else:
captions_content = None
# Download the video
try:
video_path = download_file(video_url, LOCAL_STORAGE_PATH)
logger.info(f"Job {job_id}: Video downloaded to {video_path}")
except Exception as e:
logger.error(f"Job {job_id}: Video download error: {str(e)}")
# For non-font errors, do NOT include available_fonts
return {"error": str(e)}
# Get video resolution, unless provided
if PlayResX is not None and PlayResY is not None:
video_resolution = (PlayResX, PlayResY)
logger.info(f"Job {job_id}: Using provided PlayResX/PlayResY = {PlayResX}x{PlayResY}")
else:
video_resolution = get_video_resolution(video_path)
logger.info(f"Job {job_id}: Video resolution detected = {video_resolution[0]}x{video_resolution[1]}")
# Determine style type
style_type = style_options.get('style', 'classic').lower()
logger.info(f"Job {job_id}: Using style '{style_type}' for captioning.")
# Determine subtitle content
if captions_content:
# Check if it's ASS by looking for '[Script Info]'
if '[Script Info]' in captions_content:
# It's ASS directly
subtitle_content = captions_content
subtitle_type = 'ass'
logger.info(f"Job {job_id}: Detected ASS formatted captions.")
else:
# Treat as SRT
logger.info(f"Job {job_id}: Detected SRT formatted captions.")
# Validate style for SRT
if style_type != 'classic':
error_message = "Only 'classic' style is supported for SRT captions."
logger.error(f"Job {job_id}: {error_message}")
return {"error": error_message}
transcription_result = srt_to_transcription_result(captions_content)
# Generate ASS based on chosen style
subtitle_content = process_subtitle_events(transcription_result, style_type, style_options, replace_dict, video_resolution)
subtitle_type = 'ass'
else:
# No captions provided, generate transcription
logger.info(f"Job {job_id}: No captions provided, generating transcription.")
transcription_result = generate_transcription(video_path, language=language)
# Generate ASS based on chosen style
subtitle_content = process_subtitle_events(transcription_result, style_type, style_options, replace_dict, video_resolution)
subtitle_type = 'ass'
# Check for subtitle processing errors
if isinstance(subtitle_content, dict) and 'error' in subtitle_content:
logger.error(f"Job {job_id}: {subtitle_content['error']}")
# Only include 'available_fonts' if it's a font-related error
if 'available_fonts' in subtitle_content:
return {"error": subtitle_content['error'], "available_fonts": subtitle_content.get('available_fonts', [])}
else:
return {"error": subtitle_content['error']}
# After subtitle_content is generated and before saving to file:
if exclude_time_ranges:
subtitle_content = filter_subtitle_lines(subtitle_content, exclude_time_ranges, subtitle_type)
if subtitle_type == 'ass':
logger.info(f"Job {job_id}: Filtered ASS Dialogue lines due to exclude_time_ranges.")
elif subtitle_type == 'srt':
logger.info(f"Job {job_id}: Filtered SRT subtitle blocks due to exclude_time_ranges.")
# Save the subtitle content
subtitle_filename = f"{job_id}.{subtitle_type}"
subtitle_path = os.path.join(LOCAL_STORAGE_PATH, subtitle_filename)
try:
with open(subtitle_path, 'w', encoding='utf-8') as f:
f.write(subtitle_content)
logger.info(f"Job {job_id}: Subtitle file saved to {subtitle_path}")
except Exception as e:
logger.error(f"Job {job_id}: Failed to save subtitle file: {str(e)}")
return {"error": f"Failed to save subtitle file: {str(e)}"}
return subtitle_path
except Exception as e:
logger.error(f"Job {job_id}: Error in generate_ass_captions_v1: {str(e)}", exc_info=True)
return {"error": str(e)}