nca-toolkit / services /caption_video.py
jananathbanuka
fix issues
4b12e15
# Copyright (c) 2025 Stephen G. Pope
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import os
import ffmpeg
import logging
import requests
import subprocess
from services.file_management import download_file
# Set the default local storage directory
STORAGE_PATH = "/tmp/"
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Define the path to the fonts directory
FONTS_DIR = '/usr/share/fonts/custom'
# Create the FONT_PATHS dictionary by reading the fonts directory
FONT_PATHS = {}
for font_file in os.listdir(FONTS_DIR):
if font_file.endswith('.ttf') or font_file.endswith('.TTF'):
font_name = os.path.splitext(font_file)[0]
FONT_PATHS[font_name] = os.path.join(FONTS_DIR, font_file)
# logger.info(f"Available fonts: {FONT_PATHS}")
# Create a list of acceptable font names
ACCEPTABLE_FONTS = list(FONT_PATHS.keys())
#logger.info(f"Acceptable font names: {ACCEPTABLE_FONTS}")
# Match font files with fontconfig names
def match_fonts():
try:
result = subprocess.run(['fc-list', ':family'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode == 0:
fontconfig_fonts = result.stdout.split('\n')
fontconfig_fonts = list(set(fontconfig_fonts)) # Remove duplicates
matched_fonts = {}
for font_file in FONT_PATHS.keys():
for fontconfig_font in fontconfig_fonts:
if font_file.lower() in fontconfig_font.lower():
matched_fonts[font_file] = fontconfig_font.strip()
# Parse and output the matched font names
unique_font_names = set()
for font in matched_fonts.values():
font_name = font.split(':')[1].strip()
unique_font_names.add(font_name)
# Remove duplicates from font_name and sort them alphabetically
unique_font_names = sorted(list(set(unique_font_names)))
# for font_name in unique_font_names:
# print(font_name)
else:
logger.error(f"Error matching fonts: {result.stderr}")
except Exception as e:
logger.error(f"Exception while matching fonts: {str(e)}")
match_fonts()
def generate_style_line(options):
"""Generate ASS style line from options."""
style_options = {
'Name': 'Default',
'Fontname': options.get('font_name', 'Arial'),
'Fontsize': options.get('font_size', 12),
'PrimaryColour': options.get('primary_color', '&H00FFFFFF'),
'OutlineColour': options.get('outline_color', '&H00000000'),
'BackColour': options.get('back_color', '&H00000000'),
'Bold': options.get('bold', 0),
'Italic': options.get('italic', 0),
'Underline': options.get('underline', 0),
'StrikeOut': options.get('strikeout', 0),
'ScaleX': 100,
'ScaleY': 100,
'Spacing': 0,
'Angle': 0,
'BorderStyle': 1,
'Outline': options.get('outline', 1),
'Shadow': options.get('shadow', 0),
'Alignment': options.get('alignment', 2),
'MarginL': options.get('margin_l', 10),
'MarginR': options.get('margin_r', 10),
'MarginV': options.get('margin_v', 10),
'Encoding': options.get('encoding', 1)
}
return f"Style: {','.join(str(v) for v in style_options.values())}"
def process_captioning(file_url, caption_srt, caption_type, options, job_id):
"""Process video captioning using FFmpeg."""
try:
logger.info(f"Job {job_id}: Starting download of file from {file_url}")
video_path = download_file(file_url, STORAGE_PATH)
logger.info(f"Job {job_id}: File downloaded to {video_path}")
subtitle_extension = '.' + caption_type
srt_path = os.path.join(STORAGE_PATH, f"{job_id}{subtitle_extension}")
options = convert_array_to_collection(options)
caption_style = ""
if caption_type == 'ass':
style_string = generate_style_line(options)
caption_style = f"""
[Script Info]
Title: Highlight Current Word
ScriptType: v4.00+
[V4+ Styles]
Format: Name, Fontname, Fontsize, PrimaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
{style_string}
[Events]
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
"""
logger.info(f"Job {job_id}: Generated ASS style string: {style_string}")
if caption_srt.startswith("https"):
# Download the file if caption_srt is a URL
logger.info(f"Job {job_id}: Downloading caption file from {caption_srt}")
response = requests.get(caption_srt)
response.raise_for_status() # Raise an exception for bad status codes
if caption_type in ['srt','vtt']:
with open(srt_path, 'wb') as srt_file:
srt_file.write(response.content)
else:
subtitle_content = caption_style + response.text
with open(srt_path, 'w') as srt_file:
srt_file.write(subtitle_content)
logger.info(f"Job {job_id}: Caption file downloaded to {srt_path}")
else:
# Write caption_srt content directly to file
subtitle_content = caption_style + caption_srt
with open(srt_path, 'w') as srt_file:
srt_file.write(subtitle_content)
logger.info(f"Job {job_id}: SRT file created at {srt_path}")
output_path = os.path.join(STORAGE_PATH, f"{job_id}_captioned.mp4")
logger.info(f"Job {job_id}: Output path set to {output_path}")
# Ensure font_name is converted to the full font path
font_name = options.get('font_name', 'Arial')
if font_name in FONT_PATHS:
selected_font = FONT_PATHS[font_name]
logger.info(f"Job {job_id}: Font path set to {selected_font}")
else:
selected_font = FONT_PATHS.get('Arial')
logger.warning(f"Job {job_id}: Font {font_name} not found. Using default font Arial.")
# For ASS subtitles, we should avoid overriding styles
if subtitle_extension == '.ass':
# Use the subtitles filter without force_style
subtitle_filter = f"subtitles='{srt_path}'"
logger.info(f"Job {job_id}: Using ASS subtitle filter: {subtitle_filter}")
else:
# Construct FFmpeg filter options for subtitles with detailed styling
subtitle_filter = f"subtitles={srt_path}:force_style='"
style_options = {
'FontName': font_name, # Use the font name instead of the font file path
'FontSize': options.get('font_size', 24),
'PrimaryColour': options.get('primary_color', '&H00FFFFFF'),
'SecondaryColour': options.get('secondary_color', '&H00000000'),
'OutlineColour': options.get('outline_color', '&H00000000'),
'BackColour': options.get('back_color', '&H00000000'),
'Bold': options.get('bold', 0),
'Italic': options.get('italic', 0),
'Underline': options.get('underline', 0),
'StrikeOut': options.get('strikeout', 0),
'Alignment': options.get('alignment', 2),
'MarginV': options.get('margin_v', 10),
'MarginL': options.get('margin_l', 10),
'MarginR': options.get('margin_r', 10),
'Outline': options.get('outline', 1),
'Shadow': options.get('shadow', 0),
'Blur': options.get('blur', 0),
'BorderStyle': options.get('border_style', 1),
'Encoding': options.get('encoding', 1),
'Spacing': options.get('spacing', 0),
'Angle': options.get('angle', 0),
'UpperCase': options.get('uppercase', 0)
}
# Add only populated options to the subtitle filter
subtitle_filter += ','.join(f"{k}={v}" for k, v in style_options.items() if v is not None)
subtitle_filter += "'"
logger.info(f"Job {job_id}: Using subtitle filter: {subtitle_filter}")
try:
# Log the FFmpeg command for debugging
logger.info(f"Job {job_id}: Running FFmpeg with filter: {subtitle_filter}")
# Run FFmpeg to add subtitles to the video
ffmpeg.input(video_path).output(
output_path,
vf=subtitle_filter,
acodec='copy'
).run()
logger.info(f"Job {job_id}: FFmpeg processing completed, output file at {output_path}")
except ffmpeg.Error as e:
# Log the FFmpeg stderr output
if e.stderr:
error_message = e.stderr.decode('utf8')
else:
error_message = 'Unknown FFmpeg error'
logger.error(f"Job {job_id}: FFmpeg error: {error_message}")
raise
# The upload process will be handled by the calling function
return output_path
# Clean up local files
os.remove(video_path)
os.remove(srt_path)
os.remove(output_path)
logger.info(f"Job {job_id}: Local files cleaned up")
except Exception as e:
logger.error(f"Job {job_id}: Error in process_captioning: {str(e)}")
raise
def convert_array_to_collection(options):
logger.info(f"Converting options array to dictionary: {options}")
return {item["option"]: item["value"] for item in options}