Spaces:
Sleeping
Sleeping
| import subprocess | |
| import os | |
| import tempfile | |
| import requests | |
| import re | |
| import textwrap | |
| import shutil | |
| import time | |
| import json | |
| from datetime import datetime | |
| from PIL import Image, ImageDraw, ImageFont | |
| import base64 | |
| from io import BytesIO | |
| from thefuzz import fuzz | |
| import asyncio | |
| from io import BytesIO | |
| from typing import Optional | |
| from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Request | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, Field | |
| from starlette.background import BackgroundTask | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| import logging | |
| _http_session = requests.Session() | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger("stitcher") | |
| # ======================================== | |
| # CONFIGURATION SECTION - CUSTOMIZE HERE | |
| # ======================================== | |
| REDDIT_CONFIG = { | |
| 'template_file': 'reddit_template.png', | |
| 'font_file': 'Satoshi-Bold.otf', | |
| 'font_size_max': 180, | |
| 'font_size_min': 16, | |
| 'text_wrap_width': 35, | |
| 'text_color': 'black', | |
| 'line_spacing': 10, | |
| 'text_box_width_percent': 0.85, | |
| 'text_box_height_percent': 0.65, | |
| 'y_offset': 20, | |
| } | |
| SUBTITLE_CONFIG = { | |
| 'font_file': 'LilitaOne-Regular.ttf', | |
| 'font_name': 'Lilita One', | |
| 'font_size_default': 11, | |
| 'position_alignment': 5, | |
| 'margin_left': 70, | |
| 'margin_right': 80, | |
| 'margin_vertical': 20, | |
| 'line_spacing': 2 | |
| } | |
| VIDEO_CONFIG = { | |
| 'reddit_scale_percent': 0.75, | |
| 'fade_start_percent': 0.70, | |
| 'fade_end_percent': 0.85, | |
| 'promo_percent': 0.094, | |
| 'fade_color_rgb': (218, 207, 195), | |
| 'max_chunk_size' : 50, | |
| 'speed_factor' : 1.1 , | |
| 'threads': 2, | |
| 'encoding_preset': 'fast' | |
| } | |
| # ======================================== | |
| # END CONFIGURATION SECTION | |
| # ======================================== | |
| # ========================= | |
| # HELPER FUNCTIONS | |
| # ========================= | |
| def detect_hw_encoder(): | |
| try: | |
| result = subprocess.run(["ffmpeg", "-encoders"], capture_output=True, text=True) | |
| candidates = [] | |
| if 'h264_videotoolbox' in result.stdout: | |
| candidates.append(('h264_videotoolbox', '65')) | |
| if 'h264_nvenc' in result.stdout: | |
| candidates.append(('h264_nvenc', '23')) | |
| if 'h264_qsv' in result.stdout: | |
| candidates.append(('h264_qsv', '23')) | |
| # Test each encoder with a tiny encode | |
| for encoder, quality in candidates: | |
| try: | |
| test = subprocess.run([ | |
| "ffmpeg", "-f", "lavfi", "-i", "color=c=black:s=16x16:d=0.1", | |
| "-c:v", encoder, "-q:v", quality, | |
| "-f", "null", "-" | |
| ], capture_output=True, text=True, timeout=5) | |
| if test.returncode == 0: | |
| logger.info(f"✅ GPU encoder verified: {encoder}") | |
| return encoder, quality | |
| else: | |
| logger.warning(f"⚠️ {encoder} listed but failed test encode") | |
| except: | |
| logger.warning(f"⚠️ {encoder} test timed out") | |
| except: | |
| pass | |
| logger.info("ℹ️ No working GPU encoder — using libx264") | |
| return None, None | |
| _hw_encoder, _hw_quality = detect_hw_encoder() | |
| def get_intermediate_encode_flags(crf_quality): | |
| if _hw_encoder: | |
| return ["-c:v", _hw_encoder, "-q:v", _hw_quality] | |
| return ["-c:v", "libx264", "-crf", str(crf_quality), | |
| "-preset", VIDEO_CONFIG['encoding_preset'], | |
| "-threads", str(VIDEO_CONFIG['threads'])] | |
| def sec_to_ass_time(seconds): | |
| """Converts seconds (e.g. 1.219) to ASS time format (H:MM:SS.cs)""" | |
| ms = int(seconds * 1000) | |
| h, ms = divmod(ms, 3600000) | |
| m, ms = divmod(ms, 60000) | |
| s, ms = divmod(ms, 1000) | |
| cs = ms // 10 | |
| return f"{h}:{m:02d}:{s:02d}.{cs:02d}" | |
| def setup_custom_fonts_hf(temp_dir): | |
| try: | |
| fonts_dir = os.path.join(temp_dir, 'fonts') | |
| os.makedirs(fonts_dir, exist_ok=True) | |
| script_dir = os.path.dirname(os.path.abspath(__file__)) | |
| repo_fonts_dir = os.path.join(script_dir, 'fonts') | |
| fonts_to_copy = [] | |
| if os.path.exists(repo_fonts_dir): | |
| for font_file in os.listdir(repo_fonts_dir): | |
| if font_file.endswith(('.ttf', '.otf', '.TTF', '.OTF')): | |
| fonts_to_copy.append(os.path.join(repo_fonts_dir, font_file)) | |
| for item in [REDDIT_CONFIG['font_file'], SUBTITLE_CONFIG['font_file']]: | |
| font_path = os.path.join(script_dir, item) | |
| if os.path.exists(font_path) and font_path not in fonts_to_copy: | |
| fonts_to_copy.append(font_path) | |
| for src in fonts_to_copy: | |
| dst = os.path.join(fonts_dir, os.path.basename(src)) | |
| shutil.copy(src, dst) | |
| if fonts_to_copy: | |
| fonts_conf = f"""<?xml version="1.0"?> | |
| <fontconfig><dir>{fonts_dir}</dir><cachedir>{temp_dir}/cache</cachedir></fontconfig>""" | |
| conf_path = os.path.join(temp_dir, 'fonts.conf') | |
| with open(conf_path, 'w') as f: | |
| f.write(fonts_conf) | |
| env = os.environ.copy() | |
| env['FONTCONFIG_FILE'] = conf_path | |
| env['FONTCONFIG_PATH'] = temp_dir | |
| return env | |
| return os.environ.copy() | |
| except Exception as e: return os.environ.copy() | |
| def download_file_from_url(url, output_dir, filename): | |
| try: | |
| response = _http_session.get(url, stream=True, timeout=30) | |
| response.raise_for_status() | |
| file_path = os.path.join(output_dir, filename) | |
| with open(file_path, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): f.write(chunk) | |
| return file_path | |
| except Exception as e: raise Exception(f"Failed to download file: {str(e)}") | |
| def download_book_cover(book_id, output_dir): | |
| try: | |
| image_url = f"https://books.google.com/books/publisher/content/images/frontcover/{book_id}" | |
| response = _http_session.get(image_url, timeout=30) | |
| response.raise_for_status() | |
| image_path = os.path.join(output_dir, 'book_cover.png') | |
| with open(image_path, 'wb') as f: f.write(response.content) | |
| Image.open(image_path).verify() | |
| return image_path | |
| except Exception as e: raise Exception(f"Failed to download book cover: {str(e)}") | |
| def decode_base64_image(base64_string, output_dir): | |
| try: | |
| if ',' in base64_string and 'base64' in base64_string: | |
| base64_string = base64_string.split(',', 1)[1] | |
| image_data = base64.b64decode(base64_string.strip()) | |
| Image.open(BytesIO(image_data)).verify() | |
| output_path = os.path.join(output_dir, f"book_cover_b64_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png") | |
| Image.open(BytesIO(image_data)).save(output_path, 'PNG') | |
| return output_path | |
| except Exception as e: raise Exception(f"Base64 decode failed: {str(e)}") | |
| def validate_book_cover_input(book_cover_file, book_cover_url, book_cover_base64, book_id, temp_dir): | |
| has_file = book_cover_file is not None | |
| has_url = bool(book_cover_url and book_cover_url.strip()) | |
| has_base64 = bool(book_cover_base64 and book_cover_base64.strip()) | |
| has_id = bool(book_id and book_id.strip()) | |
| methods_count = sum([has_file, has_url, has_base64, has_id]) | |
| if methods_count == 0: return None, "❌ Book cover is required" | |
| if methods_count > 1: return None, "❌ Book Cover: Use only ONE method" | |
| try: | |
| if has_file: return str(book_cover_file), None | |
| if has_url: return download_file_from_url(book_cover_url.strip(), temp_dir, f"book_cover_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"), None | |
| if has_base64: return decode_base64_image(book_cover_base64.strip(), temp_dir), None | |
| if has_id: return download_book_cover(book_id.strip(), temp_dir), None | |
| except Exception as e: return None, f"❌ Book cover error: {str(e)}" | |
| return None, None | |
| def get_video_info(video_path): | |
| try: | |
| cmd_res = ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of", "csv=s=x:p=0", video_path] | |
| result = subprocess.run(cmd_res, capture_output=True, text=True, check=True) | |
| width, height = result.stdout.strip().split('x') | |
| cmd_fps = ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=r_frame_rate", "-of", "default=noprint_wrappers=1:nokey=1", video_path] | |
| result = subprocess.run(cmd_fps, capture_output=True, text=True, check=True) | |
| fps_str = result.stdout.strip() | |
| fps = float(fps_str.split('/')[0]) / float(fps_str.split('/')[1]) if '/' in fps_str else float(fps_str) | |
| return int(width), int(height), fps | |
| except Exception as e: raise Exception(f"Failed to get video info: {str(e)}") | |
| def get_audio_duration(audio_path): | |
| try: | |
| cmd = ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", audio_path] | |
| result = subprocess.run(cmd, capture_output=True, text=True, check=True) | |
| return float(result.stdout.strip()) | |
| except Exception as e: raise Exception(f"Failed to get audio duration: {str(e)}") | |
| def create_reddit_card_with_text(template_path, hook_text, output_dir, config=REDDIT_CONFIG): | |
| template = Image.open(template_path).convert('RGBA') | |
| temp_w, temp_h = template.size | |
| box_w = int(temp_w * config['text_box_width_percent']) | |
| box_h = int(temp_h * config['text_box_height_percent']) | |
| script_dir = os.path.dirname(os.path.abspath(__file__)) | |
| font_paths = [ | |
| os.path.join(script_dir, 'fonts', config['font_file']), | |
| os.path.join(script_dir, config['font_file']), | |
| ] | |
| def load_font_strict(size): | |
| for fp in font_paths: | |
| if os.path.exists(fp): | |
| try: | |
| return ImageFont.truetype(fp, size) | |
| except Exception as e: | |
| logger.error(f"⚠️ Font exists but failed to load: {fp} — {e}") | |
| fonts_dir = os.path.join(script_dir, 'fonts') | |
| available = os.listdir(fonts_dir) if os.path.exists(fonts_dir) else 'directory missing' | |
| raise Exception( | |
| f"Reddit card font '{config['font_file']}' not found. " | |
| f"Searched: {font_paths}. " | |
| f"Available in fonts/: {available}" | |
| ) | |
| scratch = Image.new('RGBA', (1, 1)) | |
| draw_scratch = ImageDraw.Draw(scratch) | |
| best_font_size = config['font_size_min'] | |
| best_wrapped_text = hook_text | |
| for font_size in range(config['font_size_max'], config['font_size_min'] - 1, -2): | |
| font = load_font_strict(font_size) | |
| wrapped = textwrap.fill(hook_text, width=config['text_wrap_width']) | |
| bbox = draw_scratch.multiline_textbbox((0, 0), wrapped, font=font, spacing=config['line_spacing']) | |
| if (bbox[2] - bbox[0] <= box_w and bbox[3] - bbox[1] <= box_h): | |
| best_font_size = font_size | |
| best_wrapped_text = wrapped | |
| break | |
| font = load_font_strict(best_font_size) | |
| draw = ImageDraw.Draw(template) | |
| bbox = draw.multiline_textbbox((0, 0), best_wrapped_text, font=font, spacing=config['line_spacing']) | |
| x = (temp_w - (bbox[2] - bbox[0])) / 2 | |
| y = (temp_h - (bbox[3] - bbox[1])) / 2 + config['y_offset'] | |
| draw.multiline_text((x, y), best_wrapped_text, fill=config['text_color'], font=font, spacing=config['line_spacing'], align='left') | |
| output_path = os.path.join(output_dir, 'reddit_card_composite.png') | |
| template.save(output_path, 'PNG') | |
| return output_path | |
| def validate_and_get_file(uploaded_file, url_string, file_type, temp_dir): | |
| has_upload = uploaded_file is not None | |
| has_url = url_string and url_string.strip() | |
| if not has_upload and not has_url: return None, f"❌ Please provide {file_type}" | |
| if has_upload and has_url: return None, f"❌ Use only ONE method for {file_type}" | |
| if has_upload: return str(uploaded_file), None | |
| if has_url: | |
| try: | |
| fname = f"{file_type}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{url_string.split('/')[-1] if url_string.split('/')[-1] else 'file'}" | |
| return download_file_from_url(url_string.strip(), temp_dir, fname), None | |
| except Exception as e: return None, f"❌ Error downloading {file_type}: {str(e)}" | |
| return None, f"❌ Unknown error" | |
| # ============================================ | |
| # JSON LOGIC: PARSERS & SUBTITLE GENERATORS | |
| # ============================================ | |
| def extract_first_subtitle(json_path): | |
| """Gets the first full sentence up to a period for the Reddit Card.""" | |
| try: | |
| with open(json_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| title_words = [] | |
| start_time = None | |
| end_time = 3.0 | |
| for segment in data.get('segments', []): | |
| for word_data in segment.get('words', []): | |
| word_text = word_data.get('text', '').strip() | |
| if not word_text: continue | |
| if start_time is None: | |
| start_time = word_data.get('start_time', 0.0) | |
| title_words.append(word_text) | |
| # Check if this word ends with sentence-ending punctuation | |
| if re.search(r'[.!?]$', word_text): | |
| end_time = word_data.get('end_time', 3.0) | |
| return " ".join(title_words), start_time, end_time | |
| # Fallback just in case there is literally no punctuation | |
| if title_words: | |
| return " ".join(title_words), start_time, end_time | |
| return "No subtitle found", 0.0, 3.0 | |
| except Exception as e: | |
| logger.error(f"Error extracting first subtitle: {e}") | |
| return "No subtitle found", 0.0, 3.0 | |
| # ============================================ | |
| # FINDS BOOK TITLE WORD'S EXACT TIMINGS | |
| # ============================================ | |
| def find_title_and_cta(json_path, book_title): | |
| """Uses a sliding window to find the exact start and end millisecond of the book title.""" | |
| try: | |
| if not book_title or not book_title.strip(): return None, None | |
| with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) | |
| book_title_lower = book_title.lower() | |
| title_clean = re.sub(r'[^\w\s]', '', book_title_lower).strip() | |
| book_title_words = title_clean.split() | |
| window_size = len(book_title_words) | |
| # Flatten all words with their timings | |
| all_words = [] | |
| for segment in data.get('segments', []): | |
| for word_data in segment.get('words', []): | |
| word_text = word_data.get('text', '').strip() | |
| if word_text: | |
| all_words.append({ | |
| 'text': word_text, | |
| 'start': word_data.get('start_time', 0.0), | |
| 'end': word_data.get('end_time', 0.0) | |
| }) | |
| best_score = 0 | |
| best_start = None | |
| best_end = None | |
| # Sliding Window: Checks 2, 3, and 4 word groups to catch fuzzy/bad transcriptions | |
| for w_size in [window_size, window_size + 1, window_size - 1]: | |
| if w_size <= 0: continue | |
| for i in range(len(all_words) - w_size + 1): | |
| window_text = " ".join([w['text'] for w in all_words[i : i + w_size]]).lower() | |
| window_text_clean = re.sub(r'[^\w\s]', '', window_text).strip() | |
| score = fuzz.ratio(title_clean, window_text_clean) | |
| if score > best_score: | |
| best_score = score | |
| best_start = all_words[i]['start'] | |
| best_end = all_words[i + w_size - 1]['end'] | |
| # If it's a strong match, return exact start and end times | |
| if best_score >= 85: | |
| return best_start, best_end | |
| return None, None | |
| except Exception as e: | |
| logger.error(f"Error finding title: {e}") | |
| return None, None | |
| def create_body_ass_from_json(json_path, output_dir, highlight_color='yellow', | |
| font_size=None, start_time_sec=0.0, config=SUBTITLE_CONFIG, | |
| stop_time_sec=None): | |
| """Creates dynamic body subtitles starting at 1 word and increasing by 2 up to 50.""" | |
| if font_size is None: font_size = config['font_size_default'] | |
| color_map = {'yellow': ('&H00000000', '&H0000FFFF'), 'orange': ('&H0000A5FF', '&H00000000'), 'green': ('&H0000FF00', '&H00000000'), 'cyan': ('&H00FFFF00', '&H00000000'), 'pink': ('&H00FF69B4', '&H00000000'), 'red': ('&H000000FF', '&H00FFFFFF'), 'blue': ('&H00FF0000', '&H00FFFFFF')} | |
| highlight_bg, highlight_text = color_map.get(highlight_color.lower(), ('&H00000000', '&H0000FFFF')) | |
| ass_path = os.path.join(output_dir, 'body_subtitles.ass') | |
| ass_header = f"""[Script Info] | |
| Title: Body JSON Subtitles | |
| ScriptType: v4.00+ | |
| [V4+ Styles] | |
| Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding | |
| Style: Default,{config['font_name']},{font_size},&H00FFFFFF,&H00FFFFFF,&H00000000,&H80000000,0,0,0,0,100,85,0,0,1,3,1,{config['position_alignment']},{config['margin_left']},{config['margin_right']},{config['margin_vertical']},1 | |
| [Events] | |
| Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text\n""" | |
| with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) | |
| all_words = [] | |
| for segment in data.get('segments', []): | |
| for word_data in segment.get('words', []): | |
| word_text = word_data.get('text', '').strip() | |
| start_ms = word_data.get('start_time', 0) | |
| if start_ms < start_time_sec - 0.1: continue | |
| if stop_time_sec is not None and start_ms >= stop_time_sec - 0.1: continue | |
| if word_text: | |
| all_words.append({'word': word_text, 'start': start_ms, 'end': word_data.get('end_time', 0)}) | |
| chunks = [] | |
| i = 0 | |
| current_chunk_size = 1 | |
| max_chunk_size = VIDEO_CONFIG['max_chunk_size'] | |
| while i < len(all_words): | |
| remaining = len(all_words) - i | |
| take = min(current_chunk_size, remaining) | |
| chunks.append(all_words[i : i + take]) | |
| i += take | |
| if current_chunk_size < max_chunk_size: | |
| current_chunk_size = min(current_chunk_size + 4, max_chunk_size) | |
| ass_events = [] | |
| for chunk in chunks: | |
| chunk_text_only = [item['word'] for item in chunk] | |
| frame_end = chunk[-1]['end'] | |
| for idx, info in enumerate(chunk): | |
| w_start = info['start'] | |
| w_end = chunk[idx+1]['start'] if idx + 1 < len(chunk) else frame_end | |
| text_parts = [] | |
| for j, word_str in enumerate(chunk_text_only): | |
| if j == idx: text_parts.append(f"{{\\c{highlight_text}\\3c{highlight_bg}\\bord5}}{word_str}{{\\r}}") | |
| else: text_parts.append(word_str) | |
| ass_events.append(f"Dialogue: 0,{sec_to_ass_time(w_start)},{sec_to_ass_time(w_end)},Default,,0,0,0,,{' '.join(text_parts)}") | |
| with open(ass_path, 'w', encoding='utf-8') as f: | |
| f.write(ass_header + '\n'.join(ass_events)) | |
| return ass_path | |
| def create_cta_ass_from_json(json_path, output_dir, start_sec, font_size, video_width, video_height, highlight_color='yellow', config=SUBTITLE_CONFIG, words_per_frame=10): | |
| """Creates the chunky, Instagram-style box subtitles for the CTA.""" | |
| color_map = { | |
| 'yellow': ('&H00000000', '&H0000FFFF'), 'orange': ('&H0000A5FF', '&H00000000'), | |
| 'green': ('&H0000FF00', '&H00000000'), 'cyan': ('&H00FFFF00', '&H00000000'), | |
| 'pink': ('&H00FF69B4', '&H00000000'), 'red': ('&H000000FF', '&H00FFFFFF'), | |
| 'blue': ('&H00FF0000', '&H00FFFFFF') | |
| } | |
| highlight_bg, highlight_text = color_map.get(highlight_color.lower(), ('&H00000000', '&H0000FFFF')) | |
| margin_lr = int(video_width * 0.125) + 40 | |
| ass_path = os.path.join(output_dir, 'cta_subtitles.ass') | |
| # Style logic: WrapStyle=1, BorderStyle=3, Outline=10 (Tight Instagram Box) | |
| ass_header = f"""[Script Info] | |
| Title: CTA JSON Subtitles | |
| ScriptType: v4.00+ | |
| PlayResX: {video_width} | |
| PlayResY: {video_height} | |
| WrapStyle: 1 | |
| [V4+ Styles] | |
| Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding | |
| Style: Default,{config['font_name']},{font_size},&H00FFFFFF,&H00FFFFFF,&H00000000,&H80000000,0,0,0,0,100,90,0,0,3,10,0,5,{margin_lr},{margin_lr},0,1 | |
| [Events] | |
| Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text\n""" | |
| with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) | |
| all_cta_words = [] | |
| for segment in data.get('segments', []): | |
| for word_data in segment.get('words', []): | |
| word_text = word_data.get('text', '').strip() | |
| start_ms = word_data.get('start_time', 0) | |
| if start_ms < start_sec - 0.1: continue # Skip words before the CTA starts | |
| if word_text: | |
| # Merge "Book" and "Access" into "BookXcess" | |
| if word_text.lower().startswith('access') and len(all_cta_words) > 0 and all_cta_words[-1]['word'].lower() == 'book': | |
| # Keep any trailing punctuation (like commas or periods) from "Access" | |
| punctuation = word_text[6:] | |
| all_cta_words[-1]['word'] = 'BookXcess' + punctuation | |
| # Extend the highlight time to cover both words | |
| all_cta_words[-1]['end'] = word_data.get('end_time', 0) | |
| continue # Skip adding "Access" as a separate word | |
| all_cta_words.append({'word': word_text, 'start': start_ms, 'end': word_data.get('end_time', 0)}) | |
| chunks = [] | |
| i = 0 | |
| while i < len(all_cta_words): | |
| remaining = len(all_cta_words) - i | |
| take = remaining if words_per_frame < remaining <= words_per_frame + 2 else min(words_per_frame, remaining) | |
| chunks.append(all_cta_words[i : i + take]) | |
| i += take | |
| ass_events = [] | |
| for chunk in chunks: | |
| chunk_text_only = [item['word'] for item in chunk] | |
| frame_end = chunk[-1]['end'] | |
| for idx, info in enumerate(chunk): | |
| w_start = info['start'] | |
| w_end = chunk[idx+1]['start'] if idx + 1 < len(chunk) else frame_end | |
| text_parts = [] | |
| for j, word_str in enumerate(chunk_text_only): | |
| if j == idx: text_parts.append(f"{{\\c{highlight_text}}}{word_str}{{\\r}}") | |
| else: text_parts.append(word_str) | |
| ass_events.append(f"Dialogue: 1,{sec_to_ass_time(w_start)},{sec_to_ass_time(w_end)},Default,,0,0,0,,{' '.join(text_parts)}") | |
| with open(ass_path, 'w', encoding='utf-8') as f: | |
| f.write(ass_header + '\n'.join(ass_events)) | |
| return ass_path | |
| # ========================= | |
| # MAIN STITCH FUNCTION | |
| # ========================= | |
| def stitch_media(video_file, video_url, audio_file, audio_url, subtitle_file, subtitle_url, book_cover_file, book_cover_url, book_cover_base64, book_id, book_title, enable_highlight, highlight_color, font_size, crf_quality=23): | |
| temp_dir = tempfile.mkdtemp() | |
| logger.info("🚀 Starting video stitching...") | |
| try: | |
| ffmpeg_env = setup_custom_fonts_hf(temp_dir) | |
| video_path, v_err = validate_and_get_file(video_file, video_url, 'video', temp_dir) | |
| if v_err: return None, v_err | |
| audio_path, a_err = validate_and_get_file(audio_file, audio_url, 'audio', temp_dir) | |
| if a_err: return None, a_err | |
| subtitle_path, s_err = validate_and_get_file(subtitle_file, subtitle_url, 'subtitle', temp_dir) | |
| if s_err: return None, s_err | |
| # PRE-PROCESS SPEED | |
| speed_factor = VIDEO_CONFIG['speed_factor'] | |
| # 1. Physically speed up the audio file | |
| fast_audio = os.path.join(temp_dir, f"fast_audio_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3") | |
| subprocess.run(["ffmpeg", "-v", "error", "-y", "-i", audio_path, "-filter:a", f"atempo={speed_factor}", "-threads", str(VIDEO_CONFIG['threads']), fast_audio], check=True) | |
| audio_path = fast_audio # Trick the script into using the fast audio! | |
| # 2. Physically shrink the JSON timestamps | |
| fast_json = os.path.join(temp_dir, f"fast_subs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json") | |
| with open(subtitle_path, 'r', encoding='utf-8') as f: json_data = json.load(f) | |
| for segment in json_data.get('segments', []): | |
| segment['start_time'] = segment.get('start_time', 0) / speed_factor | |
| segment['end_time'] = segment.get('end_time', 0) / speed_factor | |
| for word in segment.get('words', []): | |
| word['start_time'] = word.get('start_time', 0) / speed_factor | |
| word['end_time'] = word.get('end_time', 0) / speed_factor | |
| with open(fast_json, 'w', encoding='utf-8') as f: json.dump(json_data, f) | |
| subtitle_path = fast_json # Trick the script into using the fast subtitles! | |
| video_width, video_height, video_fps = get_video_info(video_path) | |
| audio_duration = get_audio_duration(audio_path) # Now gets the new 1:18 duration natively! | |
| script_dir = os.path.dirname(os.path.abspath(__file__)) | |
| reddit_template_path = os.path.join(script_dir, REDDIT_CONFIG['template_file']) | |
| has_reddit_template = os.path.exists(reddit_template_path) | |
| first_sub_start = 0 | |
| first_sub_end = 0 | |
| if has_reddit_template: | |
| first_sub_text, first_sub_start, first_sub_end = extract_first_subtitle(subtitle_path) | |
| logger.info(f"📱 Reddit Overlay: '{first_sub_text[:30]}...'") | |
| reddit_card_path = create_reddit_card_with_text(reddit_template_path, first_sub_text, temp_dir, REDDIT_CONFIG) | |
| # --- 1. Find Title Exact Word Timings --- | |
| title_start, title_end = find_title_and_cta(subtitle_path, book_title) | |
| if not book_title or not book_title.strip(): | |
| logger.warning("⚠️ No book_title provided — using percentage-based timing for book cover") | |
| elif title_start is None: | |
| logger.warning(f"⚠️ Book title '{book_title}' not found in audio — using percentage-based timing") | |
| book_appears_at = title_start if title_start is not None else audio_duration * (1 - VIDEO_CONFIG['promo_percent']) | |
| box_appears_at = title_end if title_end is not None else book_appears_at + 1.5 | |
| if title_start is not None: | |
| logger.info(f"Hard cut to Book Cover at {title_start:.2f}s") | |
| logger.info(f"Book title silenced in subtitles.") | |
| logger.info(f"CTA text starts exactly at {title_end:.2f}s") | |
| # --- 2. Prepare Dynamic CTA Text (JSON) --- | |
| logger.info("Generating dynamic CTA...") | |
| cta_font_size = int(video_width * 0.066) | |
| cta_ass_path = create_cta_ass_from_json( | |
| subtitle_path, temp_dir, box_appears_at, | |
| cta_font_size, video_width, video_height, highlight_color | |
| ) | |
| cta_sub_escaped = cta_ass_path.replace('\\', '/').replace(':', '\\:') | |
| # --- 3. Process Main Subtitles (JSON) --- | |
| if enable_highlight: | |
| logger.info(f"✨ Processing JSON subtitles...") | |
| body_start_time = first_sub_end if has_reddit_template else 0.0 | |
| main_subtitle_path = create_body_ass_from_json( | |
| subtitle_path, temp_dir, highlight_color, font_size, | |
| start_time_sec=body_start_time, config=SUBTITLE_CONFIG, | |
| stop_time_sec=book_appears_at # Stops EXACTLY before the title is spoken | |
| ) | |
| else: | |
| main_subtitle_path = subtitle_path | |
| main_sub_escaped = main_subtitle_path.replace('\\', '/').replace(':', '\\:') | |
| book_cover_path, book_error = validate_book_cover_input(book_cover_file, book_cover_url, book_cover_base64, book_id, temp_dir) | |
| if book_error: return None, book_error | |
| has_book_cover = book_cover_path is not None | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| output_path = os.path.join(temp_dir, f"output_{timestamp}.mp4") | |
| if has_book_cover: | |
| try: | |
| fade_starts_at = audio_duration * VIDEO_CONFIG['fade_start_percent'] | |
| fade_ends_at = audio_duration * VIDEO_CONFIG['fade_end_percent'] | |
| # Safety net: If the book title is spoken BEFORE the fade is supposed to end, | |
| # we shorten the fade so it doesn't overlap the book cover cut. | |
| if fade_ends_at > book_appears_at: | |
| fade_ends_at = book_appears_at | |
| fade_starts_at = min(fade_starts_at, fade_ends_at - 1.0) | |
| fade_out_duration = fade_ends_at - fade_starts_at | |
| solid_color_duration = max(0, book_appears_at - fade_ends_at) | |
| main_video_duration = fade_ends_at | |
| cover_segment_duration = audio_duration - book_appears_at | |
| fade_color_hex = "#dacfc3" # Book page type color | |
| # 1. Main Segment (background video fading into sandal color) | |
| main_segment_path = os.path.join(temp_dir, f"main_{timestamp}.mp4") | |
| cmd_main = ["ffmpeg", "-stream_loop", "-1", "-i", video_path, "-t", str(main_video_duration), "-vf", f"fps={video_fps},scale={video_width}:{video_height},fade=t=out:st={fade_starts_at}:d={fade_out_duration}:c={fade_color_hex}", *get_intermediate_encode_flags(crf_quality), "-pix_fmt", "yuv420p", "-an", "-y", main_segment_path] | |
| # 2. Solid Color Segment (Holds the sandal color until the hard cut) | |
| solid_color_path = None | |
| cmd_solid = None | |
| if solid_color_duration > 0: | |
| solid_color_path = os.path.join(temp_dir, f"solid_{timestamp}.mp4") | |
| cmd_solid = ["ffmpeg", "-f", "lavfi", "-i", f"color=c={fade_color_hex}:s={video_width}x{video_height}:d={solid_color_duration}:r={video_fps}", "-c:v", "libx264", "-preset", "ultrafast", "-crf", "51", "-pix_fmt", "yuv420p", "-y", solid_color_path] | |
| # 3. Book Cover Segment (Hard cut triggered exactly when title is spoken) | |
| cover_segment_path = os.path.join(temp_dir, f"cover_{timestamp}.mp4") | |
| cmd_cover = ["ffmpeg", "-loop", "1", "-i", book_cover_path, "-t", str(cover_segment_duration), "-vf", f"scale={video_width}:{video_height},setsar=1,fps={video_fps}", *get_intermediate_encode_flags(crf_quality), "-pix_fmt", "yuv420p", "-an", "-y", cover_segment_path] | |
| def run_ffmpeg(cmd, name): | |
| t = time.time() | |
| subprocess.run(cmd, check=True, capture_output=True, text=True, env=ffmpeg_env) | |
| elapsed = time.time() - t | |
| return name, elapsed | |
| jobs = {"main": cmd_main, "cover": cmd_cover} | |
| if cmd_solid: | |
| jobs["solid"] = cmd_solid | |
| parallel_start = time.time() | |
| with ThreadPoolExecutor(max_workers=3) as pool: | |
| futures = {pool.submit(run_ffmpeg, cmd, name): name for name, cmd in jobs.items()} | |
| for future in as_completed(futures): | |
| name = futures[future] | |
| try: | |
| name, elapsed = future.result() | |
| logger.info(f" ✅ {name} segment: {elapsed:.1f}s") | |
| except Exception as e: | |
| raise Exception(f"{name} segment failed: {str(e)}") | |
| parallel_total = time.time() - parallel_start | |
| logger.info(f" ⚡ Parallel total: {parallel_total:.1f}s\n") | |
| # 4. Stitch them all together | |
| concat_list_path = os.path.join(temp_dir, f"concat_{timestamp}.txt") | |
| with open(concat_list_path, 'w') as f: | |
| f.write(f"file '{main_segment_path}'\n") | |
| if solid_color_path: | |
| f.write(f"file '{solid_color_path}'\n") | |
| f.write(f"file '{cover_segment_path}'\n") | |
| #--- 5. Build the Filter Graph (Subtitles, Overlays & SPEEDUP) --- | |
| input_cmd = ["ffmpeg", "-f", "concat", "-safe", "0", "-i", concat_list_path] | |
| curr_idx = 1 | |
| curr_stream = "[0:v]" | |
| if has_reddit_template: | |
| input_cmd += ["-loop", "1", "-i", reddit_card_path] | |
| filter_complex = f"[{curr_idx}:v]scale={video_width}*{VIDEO_CONFIG['reddit_scale_percent']}:-1[reddit];{curr_stream}[reddit]overlay=(W-w)/2:(H-h)/2:enable='between(t,{first_sub_start},{first_sub_end})'[v1];" | |
| curr_stream, curr_idx = "[v1]", curr_idx + 1 | |
| else: | |
| filter_complex = f"{curr_stream}copy[v1];"; curr_stream = "[v1]" | |
| # 1. Burn in Main Subtitles | |
| filter_complex += f"{curr_stream}ass={main_sub_escaped}[v2];"; curr_stream = "[v2]" | |
| # 2. Burn in CTA Subtitles (Straight to v_final - NO DUPLICATES) | |
| if cta_ass_path: | |
| filter_complex += f"{curr_stream}ass={cta_sub_escaped}[v_final]" | |
| else: | |
| filter_complex += f"{curr_stream}copy[v_final]" | |
| input_cmd += ["-i", audio_path] | |
| if _hw_encoder: | |
| final_encode = ["-c:v", _hw_encoder, "-q:v", _hw_quality] | |
| else: | |
| final_encode = [ | |
| "-c:v", "libx264", "-crf", str(crf_quality), | |
| "-preset", VIDEO_CONFIG['encoding_preset'], | |
| ] | |
| cmd_final = input_cmd + [ | |
| "-filter_complex", filter_complex, | |
| "-map", "[v_final]", "-map", f"{curr_idx}:a", | |
| *final_encode, | |
| "-threads", "0", | |
| "-c:a", "aac", "-pix_fmt", "yuv420p", "-shortest", "-y", output_path | |
| ] | |
| logger.info("Rendering final synchronized video...") | |
| final_start = time.time() | |
| subprocess.run(cmd_final, check=True, capture_output=True, text=True, env=ffmpeg_env) | |
| final_elapsed = time.time() - final_start | |
| logger.info(f" ✅ Final assembly: {final_elapsed:.1f}s") | |
| except Exception as e: return None, f"❌ Error: {str(e)}" | |
| if os.path.exists(output_path): return output_path, f"✅ Success!" | |
| else: return None, "❌ Output not created" | |
| except Exception as e: return None, f"❌ Error: {str(e)}" | |
| def verify_fonts(): | |
| script_dir = os.path.dirname(os.path.abspath(__file__)) | |
| fonts_dir = os.path.join(script_dir, 'fonts') | |
| logger.info("🔎 FONT VERIFICATION") | |
| for name, config in [("Reddit", REDDIT_CONFIG), ("Subtitle", SUBTITLE_CONFIG)]: | |
| font_file = config['font_file'] | |
| paths = [ | |
| os.path.join(fonts_dir, font_file), | |
| os.path.join(script_dir, font_file), | |
| ] | |
| found = False | |
| for fp in paths: | |
| if os.path.exists(fp): | |
| try: | |
| ImageFont.truetype(fp, 40) | |
| logger.info(f"✅ {name} font: {fp}") | |
| found = True | |
| break | |
| except Exception as e: | |
| logger.info(f"⚠️ {name} font exists but broken: {fp} — {e}") | |
| if not found: | |
| available = os.listdir(fonts_dir) if os.path.exists(fonts_dir) else 'directory missing' | |
| raise RuntimeError( | |
| f"❌ {name} font '{font_file}' not found. " | |
| f"Available in fonts/: {available}" | |
| ) | |
| verify_fonts() | |
| app = FastAPI(title="Video Stitcher API") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| class StitchErrorResponse(BaseModel): | |
| model_config = {"json_schema_extra": {"examples": [{"status": "failed", "message": "❌ FFmpeg error", "run_time": "0m 5s"}]}} | |
| status: str | |
| message: str | |
| run_time: str | |
| async def _save_upload_to_temp(upload_file: UploadFile, temp_dir: str) -> str: | |
| filename = os.path.basename(upload_file.filename) | |
| dest_path = os.path.join(temp_dir, filename) | |
| content = await upload_file.read() | |
| with open(dest_path, 'wb') as f: | |
| f.write(content) | |
| return dest_path | |
| def delayed_cleanup(temp_dir, delay=10): | |
| time.sleep(delay) | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| async def stitch_upload( | |
| request: Request, | |
| video_file: Optional[UploadFile] = File(None), | |
| video_url: Optional[str] = Form(None), | |
| audio_file: Optional[UploadFile] = File(None), | |
| audio_url: Optional[str] = Form(None), | |
| subtitle_file: Optional[UploadFile] = File(None), | |
| subtitle_url: Optional[str] = Form(None), | |
| book_cover_file: Optional[UploadFile] = File(None), | |
| book_cover_url: Optional[str] = Form(None), | |
| book_cover_base64: Optional[str] = Form(None), | |
| book_id: Optional[str] = Form(None), | |
| book_title: Optional[str] = Form(None), | |
| enable_highlight: bool = Form(True), | |
| highlight_color: str = Form('yellow'), | |
| font_size: int = Form(10), | |
| crf_quality: int = Form(23), | |
| ): | |
| # Format validation | |
| if subtitle_file and not subtitle_file.filename.endswith('.json'): | |
| raise HTTPException(status_code=422, detail="❌ Subtitle must be a .json file") | |
| if subtitle_url and not subtitle_url.strip().split('?')[0].endswith('.json'): | |
| raise HTTPException(status_code=422, detail="❌ Subtitle URL must point to a .json file") | |
| if audio_file and audio_file.content_type not in {"audio/mpeg", "audio/mp3", "audio/wav", "audio/x-wav", "audio/aac", "audio/mp4", "audio/x-m4a"}: | |
| raise HTTPException(status_code=422, detail=f"❌ Invalid audio format: {audio_file.content_type}") | |
| # Book cover — required, at least one method | |
| has_cover = any([ | |
| book_cover_file is not None, | |
| book_cover_url and book_cover_url.strip(), | |
| book_cover_base64 and book_cover_base64.strip(), | |
| book_id and book_id.strip(), | |
| ]) | |
| if not has_cover: | |
| raise HTTPException( | |
| status_code=422, | |
| detail="❌ Book cover is required. Provide one of: book_cover_file, book_cover_url, book_cover_base64, or book_id" | |
| ) | |
| if book_cover_file and book_cover_file.content_type not in { | |
| "image/jpeg", "image/png", "image/webp" | |
| }: | |
| raise HTTPException(status_code=422, detail="❌ Book cover must be jpeg, png, or webp") | |
| temp_dir = tempfile.mkdtemp() | |
| payload = { | |
| 'video_file': None, 'video_url': video_url, | |
| 'audio_file': None, 'audio_url': audio_url, | |
| 'subtitle_file': None, 'subtitle_url': subtitle_url, | |
| 'book_cover_file': None, 'book_cover_url': book_cover_url, | |
| 'book_cover_base64': book_cover_base64, 'book_id': book_id, | |
| 'book_title': book_title, | |
| 'enable_highlight': enable_highlight, | |
| 'highlight_color': highlight_color, | |
| 'font_size': font_size, | |
| 'crf_quality': crf_quality, | |
| } | |
| try: | |
| if video_file is not None: | |
| payload['video_file'] = await _save_upload_to_temp(video_file, temp_dir) | |
| if audio_file is not None: | |
| payload['audio_file'] = await _save_upload_to_temp(audio_file, temp_dir) | |
| if subtitle_file is not None: | |
| payload['subtitle_file'] = await _save_upload_to_temp(subtitle_file, temp_dir) | |
| if book_cover_file is not None: | |
| payload['book_cover_file'] = await _save_upload_to_temp(book_cover_file, temp_dir) | |
| start_time = time.time() | |
| result_path, message = await asyncio.to_thread( | |
| stitch_media, | |
| payload.get('video_file'), payload.get('video_url'), | |
| payload.get('audio_file'), payload.get('audio_url'), | |
| payload.get('subtitle_file'), payload.get('subtitle_url'), | |
| payload.get('book_cover_file'), payload.get('book_cover_url'), | |
| payload.get('book_cover_base64'), payload.get('book_id'), | |
| payload.get('book_title'), | |
| payload.get('enable_highlight', True), | |
| payload.get('highlight_color', 'yellow'), | |
| payload.get('font_size', 10), | |
| payload.get('crf_quality', 23), | |
| ) | |
| run_time = int(time.time() - start_time) | |
| run_time_fmt = f"{run_time // 60}m {run_time % 60}s" | |
| if result_path: | |
| file_size_mb = os.path.getsize(result_path) / (1024 * 1024) | |
| return FileResponse( | |
| result_path, | |
| media_type='video/mp4', | |
| filename=os.path.basename(result_path), | |
| headers={ | |
| "X-Status": "completed", | |
| "X-Run-Time": run_time_fmt, | |
| "X-File-Size-MB": f"{file_size_mb:.2f}", | |
| }, | |
| background=BackgroundTask(delayed_cleanup, temp_dir), | |
| ) | |
| else: | |
| return JSONResponse( | |
| {'status': 'failed', 'message': message, 'run_time': run_time_fmt}, | |
| status_code=400 | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def health(): | |
| return {"status": "ok"} |