Spaces:
Sleeping
Sleeping
| import subprocess | |
| import os | |
| import tempfile | |
| import requests | |
| import re | |
| import textwrap | |
| import shutil | |
| import time | |
| import json | |
| from datetime import datetime | |
| from PIL import Image, ImageDraw, ImageFont | |
| import base64 | |
| from io import BytesIO | |
| from thefuzz import fuzz | |
| import asyncio | |
| from io import BytesIO | |
| from typing import Optional | |
| from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Request | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, Field | |
| # ======================================== | |
| # CONFIGURATION SECTION - CUSTOMIZE HERE | |
| # ======================================== | |
| REDDIT_CONFIG = { | |
| 'template_file': 'reddit_template.png', | |
| 'font_file': 'RFDewi-Bold.ttf', | |
| 'font_size_max': 180, | |
| 'font_size_min': 16, | |
| 'text_wrap_width': 35, | |
| 'text_color': 'black', | |
| 'line_spacing': 10, | |
| 'text_box_width_percent': 0.85, | |
| 'text_box_height_percent': 0.65, | |
| 'y_offset': 20, | |
| } | |
| SUBTITLE_CONFIG = { | |
| 'font_file': 'LilitaOne-Regular.ttf', | |
| 'font_name': 'Lilita One', | |
| 'font_size_default': 11, | |
| 'position_alignment': 5, | |
| 'margin_left': 70, | |
| 'margin_right': 80, | |
| 'margin_vertical': 20, | |
| 'line_spacing': 2 | |
| } | |
| VIDEO_CONFIG = { | |
| 'reddit_scale_percent': 0.75, | |
| 'fade_start_percent': 0.70, | |
| 'fade_end_percent': 0.85, | |
| 'promo_percent': 0.094, | |
| 'fade_color_rgb': (218, 207, 195), | |
| } | |
| # ======================================== | |
| # END CONFIGURATION SECTION | |
| # ======================================== | |
| # ========================= | |
| # HELPER FUNCTIONS | |
| # ========================= | |
| def sec_to_ass_time(seconds): | |
| """Converts seconds (e.g. 1.219) to ASS time format (H:MM:SS.cs)""" | |
| ms = int(seconds * 1000) | |
| h, ms = divmod(ms, 3600000) | |
| m, ms = divmod(ms, 60000) | |
| s, ms = divmod(ms, 1000) | |
| cs = ms // 10 | |
| return f"{h}:{m:02d}:{s:02d}.{cs:02d}" | |
| def setup_custom_fonts_hf(temp_dir): | |
| try: | |
| fonts_dir = os.path.join(temp_dir, 'fonts') | |
| os.makedirs(fonts_dir, exist_ok=True) | |
| script_dir = os.path.dirname(os.path.abspath(__file__)) | |
| repo_fonts_dir = os.path.join(script_dir, 'fonts') | |
| fonts_to_copy = [] | |
| if os.path.exists(repo_fonts_dir): | |
| for font_file in os.listdir(repo_fonts_dir): | |
| if font_file.endswith(('.ttf', '.otf', '.TTF', '.OTF')): | |
| fonts_to_copy.append(os.path.join(repo_fonts_dir, font_file)) | |
| for item in [REDDIT_CONFIG['font_file'], SUBTITLE_CONFIG['font_file']]: | |
| font_path = os.path.join(script_dir, item) | |
| if os.path.exists(font_path) and font_path not in fonts_to_copy: | |
| fonts_to_copy.append(font_path) | |
| for src in fonts_to_copy: | |
| dst = os.path.join(fonts_dir, os.path.basename(src)) | |
| shutil.copy(src, dst) | |
| if fonts_to_copy: | |
| fonts_conf = f"""<?xml version="1.0"?> | |
| <fontconfig><dir>{fonts_dir}</dir><cachedir>{temp_dir}/cache</cachedir></fontconfig>""" | |
| conf_path = os.path.join(temp_dir, 'fonts.conf') | |
| with open(conf_path, 'w') as f: | |
| f.write(fonts_conf) | |
| env = os.environ.copy() | |
| env['FONTCONFIG_FILE'] = conf_path | |
| env['FONTCONFIG_PATH'] = temp_dir | |
| return env | |
| return os.environ.copy() | |
| except Exception as e: return os.environ.copy() | |
| def download_file_from_url(url, output_dir, filename): | |
| try: | |
| response = requests.get(url, stream=True, timeout=30) | |
| response.raise_for_status() | |
| file_path = os.path.join(output_dir, filename) | |
| with open(file_path, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): f.write(chunk) | |
| return file_path | |
| except Exception as e: raise Exception(f"Failed to download file: {str(e)}") | |
| def download_book_cover(book_id, output_dir): | |
| try: | |
| image_url = f"https://books.google.com/books/publisher/content/images/frontcover/{book_id}" | |
| response = requests.get(image_url, timeout=30) | |
| response.raise_for_status() | |
| image_path = os.path.join(output_dir, 'book_cover.png') | |
| with open(image_path, 'wb') as f: f.write(response.content) | |
| Image.open(image_path).verify() | |
| return image_path | |
| except Exception as e: raise Exception(f"Failed to download book cover: {str(e)}") | |
| def decode_base64_image(base64_string, output_dir): | |
| try: | |
| if ',' in base64_string and 'base64' in base64_string: | |
| base64_string = base64_string.split(',', 1)[1] | |
| image_data = base64.b64decode(base64_string.strip()) | |
| Image.open(BytesIO(image_data)).verify() | |
| output_path = os.path.join(output_dir, f"book_cover_b64_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png") | |
| Image.open(BytesIO(image_data)).save(output_path, 'PNG') | |
| return output_path | |
| except Exception as e: raise Exception(f"Base64 decode failed: {str(e)}") | |
| def validate_book_cover_input(book_cover_file, book_cover_url, book_cover_base64, book_id, temp_dir): | |
| has_file = book_cover_file is not None | |
| has_url = bool(book_cover_url and book_cover_url.strip()) | |
| has_base64 = bool(book_cover_base64 and book_cover_base64.strip()) | |
| has_id = bool(book_id and book_id.strip()) | |
| methods_count = sum([has_file, has_url, has_base64, has_id]) | |
| if methods_count == 0: return None, None | |
| if methods_count > 1: return None, "β Book Cover: Use only ONE method" | |
| try: | |
| if has_file: return str(book_cover_file.name if hasattr(book_cover_file, 'name') else book_cover_file), None | |
| if has_url: return download_file_from_url(book_cover_url.strip(), temp_dir, f"book_cover_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"), None | |
| if has_base64: return decode_base64_image(book_cover_base64.strip(), temp_dir), None | |
| if has_id: return download_book_cover(book_id.strip(), temp_dir), None | |
| except Exception as e: return None, f"β Book cover error: {str(e)}" | |
| return None, None | |
| def get_video_info(video_path): | |
| try: | |
| cmd_res = ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of", "csv=s=x:p=0", video_path] | |
| result = subprocess.run(cmd_res, capture_output=True, text=True, check=True) | |
| width, height = result.stdout.strip().split('x') | |
| cmd_fps = ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=r_frame_rate", "-of", "default=noprint_wrappers=1:nokey=1", video_path] | |
| result = subprocess.run(cmd_fps, capture_output=True, text=True, check=True) | |
| fps_str = result.stdout.strip() | |
| fps = float(fps_str.split('/')[0]) / float(fps_str.split('/')[1]) if '/' in fps_str else float(fps_str) | |
| return int(width), int(height), fps | |
| except Exception as e: raise Exception(f"Failed to get video info: {str(e)}") | |
| def get_audio_duration(audio_path): | |
| try: | |
| cmd = ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", audio_path] | |
| result = subprocess.run(cmd, capture_output=True, text=True, check=True) | |
| return float(result.stdout.strip()) | |
| except Exception as e: raise Exception(f"Failed to get audio duration: {str(e)}") | |
| def create_reddit_card_with_text(template_path, hook_text, output_dir, config=REDDIT_CONFIG): | |
| try: | |
| script_dir = os.path.dirname(os.path.abspath(__file__)) | |
| font_paths = [ | |
| os.path.join(script_dir, 'fonts', config['font_file']), | |
| os.path.join(script_dir, config['font_file']) | |
| ] | |
| font_path = next((fp for fp in font_paths if os.path.exists(fp)), None) | |
| if not font_path: | |
| raise Exception(f"Font file not found: {config['font_file']}") | |
| output_path = os.path.join(output_dir, 'reddit_card_composite.png') | |
| # Wrap text manually | |
| wrapped_text = textwrap.fill(hook_text, width=config['text_wrap_width']) | |
| # FFmpeg drawtext uses \n for newlines | |
| escaped_text = wrapped_text.replace("'", "\\'").replace('\n', '\n').replace(':', '\\:') | |
| # Get template dimensions using ffprobe | |
| cmd_probe = [ | |
| "ffprobe", "-v", "error", "-select_streams", "v:0", | |
| "-show_entries", "stream=width,height", | |
| "-of", "csv=s=x:p=0", template_path | |
| ] | |
| result = subprocess.run(cmd_probe, capture_output=True, text=True, check=True) | |
| temp_w, temp_h = map(int, result.stdout.strip().split('x')) | |
| # Calculate font size to fit the text box | |
| box_w = int(temp_w * config['text_box_width_percent']) | |
| box_h = int(temp_h * config['text_box_height_percent']) | |
| font_size = config['font_size_max'] | |
| # Binary search for fitting font size | |
| for fs in range(config['font_size_max'], config['font_size_min'] - 1, -2): | |
| # Estimate: average char width ~ 0.6 * font_size, line height ~ 1.2 * font_size | |
| lines = wrapped_text.split('\n') | |
| max_line_len = max(len(l) for l in lines) | |
| est_w = max_line_len * fs * 0.6 | |
| est_h = len(lines) * fs * 1.2 | |
| if est_w <= box_w and est_h <= box_h: | |
| font_size = fs | |
| break | |
| # Build drawtext filter β x/y centers the text | |
| drawtext_filter = ( | |
| f"drawtext=fontfile='{font_path}'" | |
| f":text='{escaped_text}'" | |
| f":fontcolor={config['text_color']}" | |
| f":fontsize={font_size}" | |
| f":line_spacing={config['line_spacing']}" | |
| f":x=(w-text_w)/2" | |
| f":y=(h-text_h)/2+{config['y_offset']}" | |
| ) | |
| cmd = [ | |
| "ffmpeg", "-i", template_path, | |
| "-vf", drawtext_filter, | |
| "-frames:v", "1", | |
| "-y", output_path | |
| ] | |
| subprocess.run(cmd, check=True, capture_output=True, text=True) | |
| return output_path | |
| except Exception as e: | |
| raise Exception(f"Failed to create Reddit card: {str(e)}") | |
| def validate_and_get_file(uploaded_file, url_string, file_type, temp_dir): | |
| has_upload = uploaded_file is not None | |
| has_url = url_string and url_string.strip() | |
| if not has_upload and not has_url: return None, f"β Please provide {file_type}" | |
| if has_upload and has_url: return None, f"β Use only ONE method for {file_type}" | |
| if has_upload: return str(uploaded_file.name if hasattr(uploaded_file, 'name') else uploaded_file), None | |
| if has_url: | |
| try: | |
| fname = f"{file_type}_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{url_string.split('/')[-1] if url_string.split('/')[-1] else 'file'}" | |
| return download_file_from_url(url_string.strip(), temp_dir, fname), None | |
| except Exception as e: return None, f"β Error downloading {file_type}: {str(e)}" | |
| return None, f"β Unknown error" | |
| # ============================================ | |
| # JSON LOGIC: PARSERS & SUBTITLE GENERATORS | |
| # ============================================ | |
| def extract_first_subtitle(json_path): | |
| """Gets the first full sentence up to a period for the Reddit Card.""" | |
| try: | |
| with open(json_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| title_words = [] | |
| start_time = None | |
| end_time = 3.0 | |
| for segment in data.get('segments', []): | |
| for word_data in segment.get('words', []): | |
| word_text = word_data.get('text', '').strip() | |
| if not word_text: continue | |
| if start_time is None: | |
| start_time = word_data.get('start_time', 0.0) | |
| title_words.append(word_text) | |
| # Check if this word ends with sentence-ending punctuation | |
| if re.search(r'[.!?]$', word_text): | |
| end_time = word_data.get('end_time', 3.0) | |
| return " ".join(title_words), start_time, end_time | |
| # Fallback just in case there is literally no punctuation | |
| if title_words: | |
| return " ".join(title_words), start_time, end_time | |
| return "No subtitle found", 0.0, 3.0 | |
| except Exception as e: | |
| print(f"Error extracting first subtitle: {e}") | |
| return "No subtitle found", 0.0, 3.0 | |
| # ============================================ | |
| # FINDS BOOK TITLE WORD'S EXACT TIMINGS | |
| # ============================================ | |
| def find_title_and_cta(json_path, book_title): | |
| """Uses a sliding window to find the exact start and end millisecond of the book title.""" | |
| try: | |
| if not book_title or not book_title.strip(): return None, None | |
| with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) | |
| book_title_lower = book_title.lower() | |
| title_clean = re.sub(r'[^\w\s]', '', book_title_lower).strip() | |
| book_title_words = title_clean.split() | |
| window_size = len(book_title_words) | |
| # Flatten all words with their timings | |
| all_words = [] | |
| for segment in data.get('segments', []): | |
| for word_data in segment.get('words', []): | |
| word_text = word_data.get('text', '').strip() | |
| if word_text: | |
| all_words.append({ | |
| 'text': word_text, | |
| 'start': word_data.get('start_time', 0.0), | |
| 'end': word_data.get('end_time', 0.0) | |
| }) | |
| best_score = 0 | |
| best_start = None | |
| best_end = None | |
| # Sliding Window: Checks 2, 3, and 4 word groups to catch fuzzy/bad transcriptions | |
| for w_size in [window_size, window_size + 1, window_size - 1]: | |
| if w_size <= 0: continue | |
| for i in range(len(all_words) - w_size + 1): | |
| window_text = " ".join([w['text'] for w in all_words[i : i + w_size]]).lower() | |
| window_text_clean = re.sub(r'[^\w\s]', '', window_text).strip() | |
| score = fuzz.ratio(title_clean, window_text_clean) | |
| if score > best_score: | |
| best_score = score | |
| best_start = all_words[i]['start'] | |
| best_end = all_words[i + w_size - 1]['end'] | |
| # If it's a strong match, return exact start and end times | |
| if best_score >= 85: | |
| return best_start, best_end | |
| return None, None | |
| except Exception as e: | |
| print(f"Error finding title: {e}") | |
| return None, None | |
| def create_body_ass_from_json(json_path, output_dir, highlight_color='yellow', | |
| font_size=None, start_time_sec=0.0, config=SUBTITLE_CONFIG, | |
| stop_time_sec=None): | |
| """Creates dynamic body subtitles starting at 1 word and increasing by 2 up to 50.""" | |
| if font_size is None: font_size = config['font_size_default'] | |
| color_map = {'yellow': ('&H00000000', '&H0000FFFF'), 'orange': ('&H0000A5FF', '&H00000000'), 'green': ('&H0000FF00', '&H00000000'), 'cyan': ('&H00FFFF00', '&H00000000'), 'pink': ('&H00FF69B4', '&H00000000'), 'red': ('&H000000FF', '&H00FFFFFF'), 'blue': ('&H00FF0000', '&H00FFFFFF')} | |
| highlight_bg, highlight_text = color_map.get(highlight_color.lower(), ('&H00000000', '&H0000FFFF')) | |
| ass_path = os.path.join(output_dir, 'body_subtitles.ass') | |
| ass_header = f"""[Script Info] | |
| Title: Body JSON Subtitles | |
| ScriptType: v4.00+ | |
| [V4+ Styles] | |
| Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding | |
| Style: Default,{config['font_name']},{font_size},&H00FFFFFF,&H00FFFFFF,&H00000000,&H80000000,0,0,0,0,100,85,0,0,1,3,1,{config['position_alignment']},{config['margin_left']},{config['margin_right']},{config['margin_vertical']},1 | |
| [Events] | |
| Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text\n""" | |
| with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) | |
| all_words = [] | |
| for segment in data.get('segments', []): | |
| for word_data in segment.get('words', []): | |
| word_text = word_data.get('text', '').strip() | |
| start_ms = word_data.get('start_time', 0) | |
| if start_ms < start_time_sec - 0.1: continue | |
| if stop_time_sec is not None and start_ms >= stop_time_sec - 0.1: continue | |
| if word_text: | |
| all_words.append({'word': word_text, 'start': start_ms, 'end': word_data.get('end_time', 0)}) | |
| chunks = [] | |
| i = 0 | |
| current_chunk_size = 1 | |
| max_chunk_size = 50 | |
| while i < len(all_words): | |
| remaining = len(all_words) - i | |
| take = min(current_chunk_size, remaining) | |
| chunks.append(all_words[i : i + take]) | |
| i += take | |
| if current_chunk_size < max_chunk_size: | |
| current_chunk_size = min(current_chunk_size + 4, max_chunk_size) | |
| ass_events = [] | |
| for chunk in chunks: | |
| chunk_text_only = [item['word'] for item in chunk] | |
| frame_end = chunk[-1]['end'] | |
| for idx, info in enumerate(chunk): | |
| w_start = info['start'] | |
| w_end = chunk[idx+1]['start'] if idx + 1 < len(chunk) else frame_end | |
| text_parts = [] | |
| for j, word_str in enumerate(chunk_text_only): | |
| if j == idx: text_parts.append(f"{{\\c{highlight_text}\\3c{highlight_bg}\\bord5}}{word_str}{{\\r}}") | |
| else: text_parts.append(word_str) | |
| ass_events.append(f"Dialogue: 0,{sec_to_ass_time(w_start)},{sec_to_ass_time(w_end)},Default,,0,0,0,,{' '.join(text_parts)}") | |
| with open(ass_path, 'w', encoding='utf-8') as f: | |
| f.write(ass_header + '\n'.join(ass_events)) | |
| return ass_path | |
| def create_cta_ass_from_json(json_path, output_dir, start_sec, font_size, video_width, video_height, highlight_color='yellow', config=SUBTITLE_CONFIG, words_per_frame=10): | |
| """Creates the chunky, Instagram-style box subtitles for the CTA.""" | |
| color_map = { | |
| 'yellow': ('&H00000000', '&H0000FFFF'), 'orange': ('&H0000A5FF', '&H00000000'), | |
| 'green': ('&H0000FF00', '&H00000000'), 'cyan': ('&H00FFFF00', '&H00000000'), | |
| 'pink': ('&H00FF69B4', '&H00000000'), 'red': ('&H000000FF', '&H00FFFFFF'), | |
| 'blue': ('&H00FF0000', '&H00FFFFFF') | |
| } | |
| highlight_bg, highlight_text = color_map.get(highlight_color.lower(), ('&H00000000', '&H0000FFFF')) | |
| margin_lr = int(video_width * 0.125) + 40 | |
| ass_path = os.path.join(output_dir, 'cta_subtitles.ass') | |
| # Style logic: WrapStyle=1, BorderStyle=3, Outline=10 (Tight Instagram Box) | |
| ass_header = f"""[Script Info] | |
| Title: CTA JSON Subtitles | |
| ScriptType: v4.00+ | |
| PlayResX: {video_width} | |
| PlayResY: {video_height} | |
| WrapStyle: 1 | |
| [V4+ Styles] | |
| Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding | |
| Style: Default,{config['font_name']},{font_size},&H00FFFFFF,&H00FFFFFF,&H00000000,&H80000000,0,0,0,0,100,85,0,0,3,10,0,5,{margin_lr},{margin_lr},0,1 | |
| [Events] | |
| Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text\n""" | |
| with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) | |
| all_cta_words = [] | |
| for segment in data.get('segments', []): | |
| for word_data in segment.get('words', []): | |
| word_text = word_data.get('text', '').strip() | |
| start_ms = word_data.get('start_time', 0) | |
| if start_ms < start_sec - 0.1: continue # Skip words before the CTA starts | |
| if word_text: | |
| # Merge "Book" and "Access" into "BookXcess" | |
| if word_text.lower().startswith('access') and len(all_cta_words) > 0 and all_cta_words[-1]['word'].lower() == 'book': | |
| # Keep any trailing punctuation (like commas or periods) from "Access" | |
| punctuation = word_text[6:] | |
| all_cta_words[-1]['word'] = 'BookXcess' + punctuation | |
| # Extend the highlight time to cover both words | |
| all_cta_words[-1]['end'] = word_data.get('end_time', 0) | |
| continue # Skip adding "Access" as a separate word | |
| all_cta_words.append({'word': word_text, 'start': start_ms, 'end': word_data.get('end_time', 0)}) | |
| chunks = [] | |
| i = 0 | |
| while i < len(all_cta_words): | |
| remaining = len(all_cta_words) - i | |
| take = remaining if words_per_frame < remaining <= words_per_frame + 2 else min(words_per_frame, remaining) | |
| chunks.append(all_cta_words[i : i + take]) | |
| i += take | |
| ass_events = [] | |
| for chunk in chunks: | |
| chunk_text_only = [item['word'] for item in chunk] | |
| frame_end = chunk[-1]['end'] | |
| for idx, info in enumerate(chunk): | |
| w_start = info['start'] | |
| w_end = chunk[idx+1]['start'] if idx + 1 < len(chunk) else frame_end | |
| text_parts = [] | |
| for j, word_str in enumerate(chunk_text_only): | |
| if j == idx: text_parts.append(f"{{\\c{highlight_text}}}{word_str}{{\\r}}") | |
| else: text_parts.append(word_str) | |
| ass_events.append(f"Dialogue: 1,{sec_to_ass_time(w_start)},{sec_to_ass_time(w_end)},Default,,0,0,0,,{' '.join(text_parts)}") | |
| with open(ass_path, 'w', encoding='utf-8') as f: | |
| f.write(ass_header + '\n'.join(ass_events)) | |
| return ass_path | |
| # ========================= | |
| # MAIN STITCH FUNCTION | |
| # ========================= | |
| def stitch_media(video_file, video_url, audio_file, audio_url, subtitle_file, subtitle_url, book_cover_file, book_cover_url, book_cover_base64, book_id, book_title, enable_highlight, highlight_color, font_size, crf_quality=23): | |
| temp_dir = tempfile.mkdtemp() | |
| status_msg = "π Starting video stitching...\n" | |
| try: | |
| ffmpeg_env = setup_custom_fonts_hf(temp_dir) | |
| video_path, v_err = validate_and_get_file(video_file, video_url, 'video', temp_dir) | |
| if v_err: return None, v_err | |
| audio_path, a_err = validate_and_get_file(audio_file, audio_url, 'audio', temp_dir) | |
| if a_err: return None, a_err | |
| subtitle_path, s_err = validate_and_get_file(subtitle_file, subtitle_url, 'subtitle', temp_dir) | |
| if s_err: return None, s_err | |
| # β¨ PRE-PROCESS SPEED HACK β¨ | |
| speed_factor = 1.3 | |
| # 1. Physically speed up the audio file | |
| fast_audio = os.path.join(temp_dir, f"fast_audio_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3") | |
| subprocess.run(["ffmpeg", "-v", "error", "-y", "-i", audio_path, "-filter:a", f"atempo={speed_factor}", fast_audio], check=True) | |
| audio_path = fast_audio # Trick the script into using the fast audio! | |
| # 2. Physically shrink the JSON timestamps | |
| fast_json = os.path.join(temp_dir, f"fast_subs_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json") | |
| with open(subtitle_path, 'r', encoding='utf-8') as f: json_data = json.load(f) | |
| for segment in json_data.get('segments', []): | |
| segment['start_time'] = segment.get('start_time', 0) / speed_factor | |
| segment['end_time'] = segment.get('end_time', 0) / speed_factor | |
| for word in segment.get('words', []): | |
| word['start_time'] = word.get('start_time', 0) / speed_factor | |
| word['end_time'] = word.get('end_time', 0) / speed_factor | |
| with open(fast_json, 'w', encoding='utf-8') as f: json.dump(json_data, f) | |
| subtitle_path = fast_json # Trick the script into using the fast subtitles! | |
| video_width, video_height, video_fps = get_video_info(video_path) | |
| audio_duration = get_audio_duration(audio_path) # Now gets the new 1:18 duration natively! | |
| script_dir = os.path.dirname(os.path.abspath(__file__)) | |
| reddit_template_path = os.path.join(script_dir, REDDIT_CONFIG['template_file']) | |
| has_reddit_template = os.path.exists(reddit_template_path) | |
| first_sub_start = 0 | |
| first_sub_end = 0 | |
| if has_reddit_template: | |
| try: | |
| first_sub_text, first_sub_start, first_sub_end = extract_first_subtitle(subtitle_path) | |
| status_msg += f"\nπ± Reddit Overlay: '{first_sub_text[:30]}...'\n" | |
| reddit_card_path = create_reddit_card_with_text(reddit_template_path, first_sub_text, temp_dir, REDDIT_CONFIG) | |
| except Exception as e: | |
| status_msg += f" β’ β οΈ Reddit card failed: {str(e)}\n" | |
| has_reddit_template = False | |
| # --- 1. Find Title Exact Word Timings --- | |
| title_start, title_end = find_title_and_cta(subtitle_path, book_title) | |
| book_appears_at = title_start if title_start is not None else audio_duration * (1 - VIDEO_CONFIG['promo_percent']) | |
| box_appears_at = title_end if title_end is not None else book_appears_at + 1.5 | |
| if title_start is not None: | |
| status_msg += f"\nπ Hard cut to Book Cover at {title_start:.2f}s\n" | |
| status_msg += f"π€« Book title silenced in subtitles.\n" | |
| status_msg += f"π€ CTA text starts exactly at {title_end:.2f}s\n" | |
| # --- 2. Prepare Dynamic CTA Text (JSON) --- | |
| status_msg += "π€ Generating Instagram-style dynamic CTA...\n" | |
| cta_font_size = int(video_width * 0.060) | |
| cta_ass_path = create_cta_ass_from_json( | |
| subtitle_path, temp_dir, box_appears_at, | |
| cta_font_size, video_width, video_height, highlight_color | |
| ) | |
| cta_sub_escaped = cta_ass_path.replace('\\', '/').replace(':', '\\:') | |
| # --- 3. Process Main Subtitles (JSON) --- | |
| if enable_highlight: | |
| status_msg += f"\n⨠Processing JSON subtitles...\n" | |
| body_start_time = first_sub_end if has_reddit_template else 0.0 | |
| main_subtitle_path = create_body_ass_from_json( | |
| subtitle_path, temp_dir, highlight_color, font_size, | |
| start_time_sec=body_start_time, config=SUBTITLE_CONFIG, | |
| stop_time_sec=book_appears_at # Stops EXACTLY before the title is spoken | |
| ) | |
| else: | |
| main_subtitle_path = subtitle_path | |
| main_sub_escaped = main_subtitle_path.replace('\\', '/').replace(':', '\\:') | |
| book_cover_path, book_error = validate_book_cover_input(book_cover_file, book_cover_url, book_cover_base64, book_id, temp_dir) | |
| if book_error: return None, book_error | |
| has_book_cover = book_cover_path is not None | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| output_path = os.path.join(temp_dir, f"final_{timestamp}.mp4") | |
| if has_book_cover: | |
| try: | |
| fade_starts_at = audio_duration * VIDEO_CONFIG['fade_start_percent'] | |
| fade_ends_at = audio_duration * VIDEO_CONFIG['fade_end_percent'] | |
| # Safety net: If the book title is spoken BEFORE the fade is supposed to end, | |
| # we shorten the fade so it doesn't overlap the book cover cut. | |
| if fade_ends_at > book_appears_at: | |
| fade_ends_at = book_appears_at | |
| fade_starts_at = min(fade_starts_at, fade_ends_at - 1.0) | |
| fade_out_duration = fade_ends_at - fade_starts_at | |
| solid_color_duration = max(0, book_appears_at - fade_ends_at) | |
| main_video_duration = fade_ends_at | |
| cover_segment_duration = audio_duration - book_appears_at | |
| fade_color_hex = "#dacfc3" # Book page type color | |
| # 1. Main Segment (background video fading into sandal color) | |
| main_segment_path = os.path.join(temp_dir, f"main_{timestamp}.mp4") | |
| cmd_main = ["ffmpeg", "-stream_loop", "-1", "-i", video_path, "-t", str(main_video_duration), "-vf", f"fps={video_fps},scale={video_width}:{video_height},fade=t=out:st={fade_starts_at}:d={fade_out_duration}:c={fade_color_hex}", "-c:v", "libx264", "-crf", str(crf_quality), "-pix_fmt", "yuv420p", "-an", "-y", main_segment_path] | |
| subprocess.run(cmd_main, check=True, capture_output=True, text=True, env=ffmpeg_env) | |
| # 2. Solid Color Segment (Holds the sandal color until the hard cut) | |
| solid_color_path = None | |
| if solid_color_duration > 0: | |
| solid_color_path = os.path.join(temp_dir, f"solid_{timestamp}.mp4") | |
| cmd_solid = ["ffmpeg", "-f", "lavfi", "-i", f"color=c={fade_color_hex}:s={video_width}x{video_height}:d={solid_color_duration}:r={video_fps}", "-c:v", "libx264", "-crf", str(crf_quality), "-pix_fmt", "yuv420p", "-y", solid_color_path] | |
| subprocess.run(cmd_solid, check=True, capture_output=True, text=True, env=ffmpeg_env) | |
| # 3. Book Cover Segment (Hard cut triggered exactly when title is spoken) | |
| cover_segment_path = os.path.join(temp_dir, f"cover_{timestamp}.mp4") | |
| cmd_cover = ["ffmpeg", "-loop", "1", "-i", book_cover_path, "-t", str(cover_segment_duration), "-vf", f"scale={video_width}:{video_height},setsar=1,fps={video_fps}", "-c:v", "libx264", "-crf", str(crf_quality), "-pix_fmt", "yuv420p", "-an", "-y", cover_segment_path] | |
| subprocess.run(cmd_cover, check=True, capture_output=True, text=True, env=ffmpeg_env) | |
| # 4. Stitch them all together | |
| concat_list_path = os.path.join(temp_dir, f"concat_{timestamp}.txt") | |
| with open(concat_list_path, 'w') as f: | |
| f.write(f"file '{main_segment_path}'\n") | |
| if solid_color_path: | |
| f.write(f"file '{solid_color_path}'\n") | |
| f.write(f"file '{cover_segment_path}'\n") | |
| #--- 5. Build the Filter Graph (Subtitles, Overlays & SPEEDUP) --- | |
| input_cmd = ["ffmpeg", "-f", "concat", "-safe", "0", "-i", concat_list_path] | |
| curr_idx = 1 | |
| curr_stream = "[0:v]" | |
| if has_reddit_template: | |
| input_cmd += ["-loop", "1", "-i", reddit_card_path] | |
| filter_complex = f"[{curr_idx}:v]scale={video_width}*{VIDEO_CONFIG['reddit_scale_percent']}:-1[reddit];{curr_stream}[reddit]overlay=(W-w)/2:(H-h)/2:enable='between(t,{first_sub_start},{first_sub_end})'[v1];" | |
| curr_stream, curr_idx = "[v1]", curr_idx + 1 | |
| else: | |
| filter_complex = f"{curr_stream}copy[v1];"; curr_stream = "[v1]" | |
| # 1. Burn in Main Subtitles | |
| filter_complex += f"{curr_stream}ass={main_sub_escaped}[v2];"; curr_stream = "[v2]" | |
| # 2. Burn in CTA Subtitles (Straight to v_final - NO DUPLICATES) | |
| if cta_ass_path: | |
| filter_complex += f"{curr_stream}ass={cta_sub_escaped}[v_final]" | |
| else: | |
| filter_complex += f"{curr_stream}copy[v_final]" | |
| input_cmd += ["-i", audio_path] | |
| cmd_final = input_cmd + [ | |
| "-filter_complex", filter_complex, | |
| "-map", "[v_final]", "-map", f"{curr_idx}:a", | |
| "-c:v", "libx264", "-crf", str(crf_quality), | |
| "-c:a", "aac", "-pix_fmt", "yuv420p", "-shortest", "-y", output_path | |
| ] | |
| status_msg += "π¬ Rendering final synchronized video...\n" | |
| subprocess.run(cmd_final, check=True, capture_output=True, text=True, env=ffmpeg_env) | |
| except Exception as e: | |
| return None, f"β Book cover processing error: {str(e)}" | |
| if os.path.exists(output_path): return output_path, f"β Success!" | |
| else: return None, "β Output not created" | |
| except Exception as e: return None, f"β Error: {str(e)}" | |
| app = FastAPI(title="Video Stitcher API") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| class StitchErrorResponse(BaseModel): | |
| status: str = Field(..., example="failed") | |
| message: str = Field(..., example="β FFmpeg error: ...") | |
| run_time: str = Field(..., example="0m 5s") | |
| def _save_upload_to_temp(upload_file: UploadFile, temp_dir: str) -> str: | |
| filename = os.path.basename(upload_file.filename) | |
| dest_path = os.path.join(temp_dir, filename) | |
| with open(dest_path, 'wb') as f: | |
| f.write(upload_file.file.read()) | |
| return dest_path | |
| async def stitch_upload( | |
| request: Request, | |
| video_file: Optional[UploadFile] = File(None), | |
| video_url: Optional[str] = Form(None), | |
| audio_file: Optional[UploadFile] = File(None), | |
| audio_url: Optional[str] = Form(None), | |
| subtitle_file: Optional[UploadFile] = File(None), | |
| subtitle_url: Optional[str] = Form(None), | |
| book_cover_file: Optional[UploadFile] = File(None), | |
| book_cover_url: Optional[str] = Form(None), | |
| book_cover_base64: Optional[str] = Form(None), | |
| book_id: Optional[str] = Form(None), | |
| book_title: Optional[str] = Form(None), | |
| enable_highlight: bool = Form(True), | |
| highlight_color: str = Form('yellow'), | |
| font_size: int = Form(10), | |
| crf_quality: int = Form(23), | |
| ): | |
| # Format validation | |
| if subtitle_file and not subtitle_file.filename.endswith('.json'): | |
| raise HTTPException(status_code=422, detail="β Subtitle must be a .json file") | |
| if subtitle_url and not subtitle_url.strip().split('?')[0].endswith('.json'): | |
| raise HTTPException(status_code=422, detail="β Subtitle URL must point to a .json file") | |
| if audio_file and audio_file.content_type not in {"audio/mpeg", "audio/mp3", "audio/wav", "audio/x-wav", "audio/aac", "audio/mp4", "audio/x-m4a"}: | |
| raise HTTPException(status_code=422, detail=f"β Invalid audio format: {audio_file.content_type}") | |
| if book_cover_file and book_cover_file.content_type not in {"image/jpeg", "image/png", "image/webp"}: | |
| raise HTTPException(status_code=422, detail="β Book cover must be jpeg, png, or webp") | |
| temp_dir = tempfile.mkdtemp() | |
| payload = { | |
| 'video_file': None, 'video_url': video_url, | |
| 'audio_file': None, 'audio_url': audio_url, | |
| 'subtitle_file': None, 'subtitle_url': subtitle_url, | |
| 'book_cover_file': None, 'book_cover_url': book_cover_url, | |
| 'book_cover_base64': book_cover_base64, 'book_id': book_id, | |
| 'book_title': book_title, | |
| 'enable_highlight': enable_highlight, | |
| 'highlight_color': highlight_color, | |
| 'font_size': font_size, | |
| 'crf_quality': crf_quality, | |
| } | |
| try: | |
| if video_file is not None: | |
| payload['video_file'] = _save_upload_to_temp(video_file, temp_dir) | |
| if audio_file is not None: | |
| payload['audio_file'] = _save_upload_to_temp(audio_file, temp_dir) | |
| if subtitle_file is not None: | |
| payload['subtitle_file'] = _save_upload_to_temp(subtitle_file, temp_dir) | |
| if book_cover_file is not None: | |
| payload['book_cover_file'] = _save_upload_to_temp(book_cover_file, temp_dir) | |
| start_time = time.time() | |
| loop = asyncio.get_event_loop() | |
| result_path, message = await loop.run_in_executor( | |
| None, | |
| lambda: stitch_media( | |
| payload.get('video_file'), payload.get('video_url'), | |
| payload.get('audio_file'), payload.get('audio_url'), | |
| payload.get('subtitle_file'), payload.get('subtitle_url'), | |
| payload.get('book_cover_file'), payload.get('book_cover_url'), | |
| payload.get('book_cover_base64'), payload.get('book_id'), | |
| payload.get('book_title'), | |
| payload.get('enable_highlight', True), | |
| payload.get('highlight_color', 'yellow'), | |
| payload.get('font_size', 10), | |
| payload.get('crf_quality', 23), | |
| ) | |
| ) | |
| run_time = int(time.time() - start_time) | |
| run_time_fmt = f"{run_time // 60}m {run_time % 60}s" | |
| if result_path: | |
| file_size_mb = os.path.getsize(result_path) / (1024 * 1024) | |
| return FileResponse( | |
| result_path, | |
| media_type='video/mp4', | |
| filename=os.path.basename(result_path), | |
| headers={ | |
| "X-Status": "completed", | |
| "X-Run-Time": run_time_fmt, | |
| "X-File-Size-MB": f"{file_size_mb:.2f}", | |
| } | |
| ) | |
| else: | |
| return JSONResponse( | |
| {'status': 'failed', 'message': message, 'run_time': run_time_fmt}, | |
| status_code=400 | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| async def health(): | |
| return {"status": "ok"} |