Spaces:
Sleeping
Sleeping
| """ | |
| Hugging Face Gradio Space: Text/Audio → Timestamped Video Pipeline | |
| Production-ready, API-first, modular architecture | |
| This space acts as Step 1+2+3 of an autonomous YouTube video pipeline. | |
| Designed to be called programmatically via Gradio API from Google Apps Script. | |
| """ | |
| import gradio as gr | |
| import json | |
| import os | |
| import shutil | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple, Any | |
| import tempfile | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| # Core dependencies | |
| import torch | |
| import whisper | |
| import edge_tts | |
| import asyncio | |
| import pandas as pd | |
| import numpy as np | |
| import requests | |
| from datetime import datetime | |
| import re | |
| import nltk | |
| import subprocess | |
| import ffmpeg | |
| # MoviePy imports (handle different versions) | |
| try: | |
| from moviepy.editor import VideoFileClip, AudioFileClip, concatenate_videoclips, CompositeAudioClip | |
| from moviepy.video.fx.resize import resize as moviepy_resize | |
| MOVIEPY_AVAILABLE = True | |
| except ImportError: | |
| try: | |
| import moviepy.editor as mpy | |
| VideoFileClip = mpy.VideoFileClip | |
| AudioFileClip = mpy.AudioFileClip | |
| concatenate_videoclips = mpy.concatenate_videoclips | |
| CompositeAudioClip = mpy.CompositeAudioClip | |
| MOVIEPY_AVAILABLE = True | |
| except ImportError: | |
| MOVIEPY_AVAILABLE = False | |
| print("WARNING: MoviePy not available. Using ffmpeg directly for video assembly.") | |
| # Download NLTK data | |
| try: | |
| nltk.data.find('corpora/stopwords') | |
| except LookupError: | |
| nltk.download('stopwords', quiet=True) | |
| from nltk.corpus import stopwords | |
| # ==================== CONFIGURATION ==================== | |
| # Common words to filter (configurable) | |
| COMMON_WORDS = set([ | |
| 'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', | |
| 'of', 'with', 'is', 'are', 'was', 'were', 'be', 'been', 'being', | |
| 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'should', | |
| 'could', 'may', 'might', 'must', 'can', 'it', 'its', 'this', 'that', | |
| 'these', 'those', 'i', 'you', 'he', 'she', 'we', 'they', 'them', 'their' | |
| ]) | |
| # Add NLTK stopwords | |
| try: | |
| COMMON_WORDS.update(stopwords.words('english')) | |
| except: | |
| pass | |
| # Pexels API (Free tier - register at pexels.com) | |
| PEXELS_API_KEY = os.environ.get('PEXELS_API_KEY', '') | |
| # Tenor API (Free tier - register at tenor.com) | |
| TENOR_API_KEY = os.environ.get('TENOR_API_KEY', '') | |
| # ==================== STEP 1: SPEECH-TO-TEXT ==================== | |
| class SpeechToText: | |
| """Handles audio transcription using Whisper""" | |
| def __init__(self): | |
| self.model = None | |
| def load_model(self, model_size="base"): | |
| """Load Whisper model (lazy loading)""" | |
| if self.model is None: | |
| print(f"Loading Whisper {model_size} model...") | |
| self.model = whisper.load_model(model_size) | |
| return self.model | |
| def transcribe(self, audio_path: str, language: Optional[str] = None) -> str: | |
| """ | |
| Transcribe audio to text | |
| Args: | |
| audio_path: Path to audio file | |
| language: Language code (e.g., 'en', 'es') or None for auto-detect | |
| Returns: | |
| Transcribed text | |
| """ | |
| model = self.load_model() | |
| result = model.transcribe( | |
| audio_path, | |
| language=language, | |
| fp16=torch.cuda.is_available() | |
| ) | |
| return result['text'].strip() | |
| # ==================== STEP 2: TEXT-TO-SPEECH ==================== | |
| class TextToSpeech: | |
| """Handles TTS using Edge TTS (free, high quality)""" | |
| # Available voices (extensible) | |
| VOICES = { | |
| 'en-US-AriaNeural': 'English (US) - Aria (Female)', | |
| 'en-US-GuyNeural': 'English (US) - Guy (Male)', | |
| 'en-GB-SoniaNeural': 'English (UK) - Sonia (Female)', | |
| 'en-GB-RyanNeural': 'English (UK) - Ryan (Male)', | |
| 'en-IN-NeerjaNeural': 'English (India) - Neerja (Female)', | |
| 'en-IN-PrabhatNeural': 'English (India) - Prabhat (Male)', | |
| 'es-ES-ElviraNeural': 'Spanish (Spain) - Elvira (Female)', | |
| 'fr-FR-DeniseNeural': 'French (France) - Denise (Female)', | |
| 'de-DE-KatjaNeural': 'German - Katja (Female)', | |
| 'hi-IN-SwaraNeural': 'Hindi - Swara (Female)', | |
| } | |
| async def synthesize_async( | |
| self, | |
| text: str, | |
| voice: str = 'en-US-AriaNeural', | |
| rate: str = '+0%', | |
| pitch: str = '+0Hz', | |
| output_path: str = 'output.wav' | |
| ) -> str: | |
| """ | |
| Synthesize speech from text (async) | |
| Args: | |
| text: Input text | |
| voice: Voice ID | |
| rate: Speed adjustment (e.g., '+10%', '-20%') | |
| pitch: Pitch adjustment (e.g., '+5Hz', '-10Hz') | |
| output_path: Output file path | |
| Returns: | |
| Path to generated audio file | |
| """ | |
| communicate = edge_tts.Communicate(text, voice, rate=rate, pitch=pitch) | |
| await communicate.save(output_path) | |
| return output_path | |
| def synthesize(self, text: str, voice: str, rate: float, pitch: int, output_path: str) -> str: | |
| """Sync wrapper for TTS""" | |
| # Convert rate (1.0 = normal) to percentage | |
| rate_str = f"{int((rate - 1.0) * 100):+d}%" | |
| pitch_str = f"{pitch:+d}Hz" | |
| return asyncio.run(self.synthesize_async(text, voice, rate_str, pitch_str, output_path)) | |
| # ==================== STEP 3: WORD-LEVEL TIMESTAMPS ==================== | |
| class TimestampGenerator: | |
| """Generate word-level timestamps using Whisper""" | |
| def __init__(self): | |
| self.model = None | |
| def load_model(self, model_size="base"): | |
| """Load Whisper model""" | |
| if self.model is None: | |
| self.model = whisper.load_model(model_size) | |
| return self.model | |
| def generate_timestamps(self, audio_path: str) -> pd.DataFrame: | |
| """ | |
| Generate word-level timestamps | |
| Args: | |
| audio_path: Path to audio file | |
| Returns: | |
| DataFrame with columns: word, start, end, duration | |
| """ | |
| model = self.load_model() | |
| result = model.transcribe( | |
| audio_path, | |
| word_timestamps=True, | |
| fp16=torch.cuda.is_available() | |
| ) | |
| timestamps = [] | |
| for segment in result['segments']: | |
| if 'words' in segment: | |
| for word_info in segment['words']: | |
| timestamps.append({ | |
| 'word': word_info['word'].strip(), | |
| 'start': word_info['start'], | |
| 'end': word_info['end'], | |
| 'duration': word_info['end'] - word_info['start'] | |
| }) | |
| return pd.DataFrame(timestamps) | |
| # ==================== STEP 4: TIMESTAMP CLEANING ==================== | |
| class TimestampCleaner: | |
| """Clean and extend timestamps for scene alignment""" | |
| def __init__(self, common_words: set = COMMON_WORDS): | |
| self.common_words = common_words | |
| def is_meaningful(self, word: str) -> bool: | |
| """Check if word is meaningful (not common/stopword)""" | |
| word_lower = word.lower().strip() | |
| # Remove punctuation | |
| word_clean = re.sub(r'[^\w\s]', '', word_lower) | |
| if len(word_clean) < 2: | |
| return False | |
| if word_clean in self.common_words: | |
| return False | |
| if word_clean.isdigit(): | |
| return False | |
| return True | |
| def remove_repetitive(self, words: List[str], window: int = 3) -> List[bool]: | |
| """Mark repetitive words within a window""" | |
| keep = [True] * len(words) | |
| for i in range(len(words)): | |
| word = words[i].lower().strip() | |
| # Check if this word appears in the next 'window' words | |
| for j in range(i + 1, min(i + window + 1, len(words))): | |
| if words[j].lower().strip() == word: | |
| keep[j] = False | |
| return keep | |
| def clean_timestamps(self, df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Clean timestamps: remove common words, repetitive words | |
| Args: | |
| df: DataFrame with word-level timestamps | |
| Returns: | |
| Cleaned DataFrame | |
| """ | |
| # Filter meaningful words | |
| df['meaningful'] = df['word'].apply(self.is_meaningful) | |
| # Remove repetitive words | |
| keep_mask = self.remove_repetitive(df['word'].tolist()) | |
| df['not_repetitive'] = keep_mask | |
| # Combine filters | |
| df_cleaned = df[df['meaningful'] & df['not_repetitive']].copy() | |
| return df_cleaned[['word', 'start', 'end', 'duration']].reset_index(drop=True) | |
| def extend_timestamps(self, df: pd.DataFrame, min_duration: float = 2.0) -> pd.DataFrame: | |
| """ | |
| Extend timestamp durations to next word (scene duration logic) | |
| Args: | |
| df: Cleaned timestamps | |
| min_duration: Minimum scene duration | |
| Returns: | |
| DataFrame with extended durations | |
| """ | |
| df = df.copy() | |
| for i in range(len(df) - 1): | |
| # Extend to next word's start time | |
| df.loc[i, 'end'] = df.loc[i + 1, 'start'] | |
| df.loc[i, 'duration'] = df.loc[i, 'end'] - df.loc[i, 'start'] | |
| # Ensure minimum duration | |
| df['duration'] = df['duration'].clip(lower=min_duration) | |
| df['end'] = df['start'] + df['duration'] | |
| return df | |
| # ==================== STEP 5: VISUAL FETCHING ==================== | |
| class VisualFetcher: | |
| """Fetch videos/GIFs from Pexels or Tenor""" | |
| def __init__(self, pexels_key: str = PEXELS_API_KEY, tenor_key: str = TENOR_API_KEY): | |
| self.pexels_key = pexels_key | |
| self.tenor_key = tenor_key | |
| def create_placeholder_video(self, output_path: str, duration: float = 3.0, text: str = "") -> bool: | |
| """Create a placeholder video using ffmpeg when real video unavailable""" | |
| try: | |
| # Create a solid color video with text overlay | |
| color = "0x2C3E50" # Dark blue-gray | |
| cmd = [ | |
| 'ffmpeg', '-y', | |
| '-f', 'lavfi', '-i', f'color=c={color}:s=1920x1080:d={duration}', | |
| '-vf', f'drawtext=text=\'{text}\':fontcolor=white:fontsize=60:x=(w-text_w)/2:y=(h-text_h)/2', | |
| '-c:v', 'libx264', '-t', str(duration), '-pix_fmt', 'yuv420p', | |
| output_path | |
| ] | |
| subprocess.run(cmd, check=True, capture_output=True, timeout=30) | |
| if os.path.exists(output_path) and os.path.getsize(output_path) > 1000: | |
| print(f"Created placeholder video: {output_path}") | |
| return True | |
| except Exception as e: | |
| print(f"Failed to create placeholder: {e}") | |
| return False | |
| def search_pexels(self, query: str, per_page: int = 1) -> Optional[str]: | |
| """Search Pexels for video URL""" | |
| if not self.pexels_key: | |
| print("WARNING: PEXELS_API_KEY not set") | |
| return None | |
| url = "https://api.pexels.com/videos/search" | |
| headers = {"Authorization": self.pexels_key} | |
| params = {"query": query, "per_page": per_page, "orientation": "landscape"} | |
| try: | |
| response = requests.get(url, headers=headers, params=params, timeout=10) | |
| response.raise_for_status() | |
| data = response.json() | |
| if data.get('videos'): | |
| # Get medium quality video | |
| video_files = data['videos'][0]['video_files'] | |
| for vf in video_files: | |
| if vf.get('quality') in ['hd', 'sd']: | |
| print(f"Found Pexels video: {vf['quality']} - {vf.get('width')}x{vf.get('height')}") | |
| return vf['link'] | |
| # Fallback to first available | |
| if video_files: | |
| print(f"Using fallback video file") | |
| return video_files[0]['link'] | |
| else: | |
| print(f"No Pexels results for: {query}") | |
| except requests.exceptions.HTTPError as e: | |
| if e.response.status_code == 429: | |
| print(f"Pexels rate limit exceeded for '{query}'") | |
| else: | |
| print(f"Pexels HTTP error for '{query}': {e}") | |
| except Exception as e: | |
| print(f"Pexels error for '{query}': {e}") | |
| return None | |
| def search_tenor(self, query: str, limit: int = 1) -> Optional[str]: | |
| """Search Tenor for GIF URL""" | |
| if not self.tenor_key: | |
| return None | |
| url = "https://tenor.googleapis.com/v2/search" | |
| params = { | |
| "q": query, | |
| "key": self.tenor_key, | |
| "limit": limit, | |
| "media_filter": "mp4" | |
| } | |
| try: | |
| response = requests.get(url, params=params, timeout=10) | |
| response.raise_for_status() | |
| data = response.json() | |
| if data['results']: | |
| return data['results'][0]['media_formats']['mp4']['url'] | |
| except Exception as e: | |
| print(f"Tenor error for '{query}': {e}") | |
| return None | |
| def download_video(self, url: str, output_path: str) -> bool: | |
| """Download video from URL""" | |
| try: | |
| response = requests.get(url, stream=True, timeout=30) | |
| response.raise_for_status() | |
| with open(output_path, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| # Verify file was downloaded and has content | |
| if os.path.exists(output_path) and os.path.getsize(output_path) > 1000: | |
| return True | |
| else: | |
| print(f"Downloaded file is invalid or too small: {output_path}") | |
| return False | |
| except Exception as e: | |
| print(f"Download error: {e}") | |
| return False | |
| def fetch_visuals( | |
| self, | |
| queries: List[str], | |
| output_dir: str, | |
| source: str = 'pexels', | |
| use_placeholders: bool = True | |
| ) -> List[str]: | |
| """ | |
| Fetch and download videos for list of queries | |
| Args: | |
| queries: List of search queries | |
| output_dir: Directory to save videos | |
| source: 'pexels' or 'tenor' | |
| use_placeholders: Create placeholder videos if download fails | |
| Returns: | |
| List of downloaded video paths (None for failed downloads) | |
| """ | |
| Path(output_dir).mkdir(parents=True, exist_ok=True) | |
| video_paths = [] | |
| for i, query in enumerate(queries, 1): | |
| print(f"Fetching visual {i}/{len(queries)}: {query}") | |
| output_path = os.path.join(output_dir, f"{i}.mp4") | |
| success = False | |
| # Try to download real video | |
| if source == 'pexels': | |
| video_url = self.search_pexels(query) | |
| elif source == 'tenor': | |
| video_url = self.search_tenor(query) | |
| else: | |
| video_url = None | |
| if video_url: | |
| success = self.download_video(video_url, output_path) | |
| if success and os.path.exists(output_path): | |
| print(f"✓ Downloaded: {output_path} ({os.path.getsize(output_path)} bytes)") | |
| video_paths.append(output_path) | |
| continue | |
| # Fallback to placeholder if download failed | |
| if use_placeholders: | |
| print(f"⚠ Creating placeholder for: {query}") | |
| if self.create_placeholder_video(output_path, duration=3.0, text=query[:30]): | |
| video_paths.append(output_path) | |
| else: | |
| print(f"✗ Failed to create placeholder for: {query}") | |
| video_paths.append(None) | |
| else: | |
| print(f"✗ No video available for: {query}") | |
| video_paths.append(None) | |
| # Log summary | |
| valid_count = sum(1 for p in video_paths if p is not None) | |
| print(f"Download summary: {valid_count}/{len(queries)} videos available") | |
| return video_paths | |
| # ==================== STEP 6: VIDEO ASSEMBLY ==================== | |
| # At the top of your file, add this import | |
| from typing import TYPE_CHECKING | |
| if TYPE_CHECKING: | |
| from moviepy.editor import VideoFileClip | |
| # Then modify the VideoAssembler class: | |
| class VideoAssembler: | |
| """Assemble final video from clips + audio + timestamps""" | |
| def __init__(self): | |
| self.use_moviepy = MOVIEPY_AVAILABLE | |
| def resize_clip_moviepy(self, clip: "VideoFileClip", aspect_ratio: str) -> "VideoFileClip": | |
| """Resize clip to target aspect ratio using MoviePy""" | |
| target_ratios = { | |
| '16:9': (1920, 1080), | |
| '9:16': (1080, 1920), | |
| '1:1': (1080, 1080), | |
| '4:3': (1440, 1080) | |
| } | |
| target_w, target_h = target_ratios.get(aspect_ratio, (1920, 1080)) | |
| # Calculate scale to cover target area | |
| scale_w = target_w / clip.w | |
| scale_h = target_h / clip.h | |
| scale = max(scale_w, scale_h) | |
| # Resize and crop | |
| resized = clip.resize(scale) | |
| # Center crop | |
| x_center = resized.w / 2 | |
| y_center = resized.h / 2 | |
| x1 = x_center - target_w / 2 | |
| y1 = y_center - target_h / 2 | |
| return resized.crop(x1=x1, y1=y1, width=target_w, height=target_h) | |
| def assemble_with_ffmpeg( | |
| self, | |
| video_paths: List[str], | |
| timestamps: pd.DataFrame, | |
| audio_path: str, | |
| output_path: str, | |
| aspect_ratio: str = '16:9' | |
| ) -> str: | |
| """Assemble video using ffmpeg directly (fallback method)""" | |
| target_ratios = { | |
| '16:9': (1920, 1080), | |
| '9:16': (1080, 1920), | |
| '1:1': (1080, 1080), | |
| '4:3': (1440, 1080) | |
| } | |
| target_w, target_h = target_ratios.get(aspect_ratio, (1920, 1080)) | |
| # Create temporary directory for processed clips | |
| temp_dir = os.path.join(os.path.dirname(output_path), 'temp_clips') | |
| Path(temp_dir).mkdir(parents=True, exist_ok=True) | |
| processed_clips = [] | |
| # Process each clip with ffmpeg | |
| for i, (video_path, row) in enumerate(zip(video_paths, timestamps.itertuples())): | |
| if not video_path or not os.path.exists(video_path): | |
| print(f"Skipping clip {i}: Invalid path") | |
| continue | |
| # Verify file exists and has size | |
| file_size = os.path.getsize(video_path) | |
| if file_size < 1000: | |
| print(f"Skipping clip {i}: File too small ({file_size} bytes)") | |
| continue | |
| temp_output = os.path.join(temp_dir, f'clip_{i:04d}.mp4') | |
| try: | |
| print(f"Processing clip {i}: {video_path} -> {temp_output}") | |
| # Resize, crop, and set duration using ffmpeg | |
| result = subprocess.run([ | |
| 'ffmpeg', '-y', '-i', video_path, | |
| '-vf', f'scale={target_w}:{target_h}:force_original_aspect_ratio=increase,crop={target_w}:{target_h}', | |
| '-t', str(row.duration), | |
| '-c:v', 'libx264', '-preset', 'fast', '-crf', '23', | |
| '-an', # Remove audio from clips | |
| temp_output | |
| ], check=True, capture_output=True, text=True) | |
| # Verify output exists | |
| if os.path.exists(temp_output) and os.path.getsize(temp_output) > 1000: | |
| processed_clips.append(temp_output) | |
| print(f"✓ Processed clip {i}: {os.path.getsize(temp_output)} bytes") | |
| else: | |
| print(f"✗ Failed to process clip {i}: Output invalid") | |
| except subprocess.CalledProcessError as e: | |
| print(f"✗ ffmpeg error for clip {i}:") | |
| print(f" stdout: {e.stdout}") | |
| print(f" stderr: {e.stderr}") | |
| continue | |
| except Exception as e: | |
| print(f"✗ Unexpected error processing clip {i}: {e}") | |
| continue | |
| if not processed_clips: | |
| raise ValueError(f"No valid clips processed. Checked {len(video_paths)} input videos.") | |
| print(f"Successfully processed {len(processed_clips)} clips") | |
| # Create concat file | |
| concat_file = os.path.join(temp_dir, 'concat.txt') | |
| with open(concat_file, 'w') as f: | |
| for clip in processed_clips: | |
| f.write(f"file '{os.path.abspath(clip)}'\n") | |
| print(f"Concatenating {len(processed_clips)} clips...") | |
| # Concatenate clips | |
| temp_video = os.path.join(temp_dir, 'concatenated.mp4') | |
| subprocess.run([ | |
| 'ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', concat_file, | |
| '-c', 'copy', temp_video | |
| ], check=True, capture_output=True) | |
| print(f"Adding audio track...") | |
| # Add audio | |
| subprocess.run([ | |
| 'ffmpeg', '-y', '-i', temp_video, '-i', audio_path, | |
| '-c:v', 'copy', '-c:a', 'aac', '-shortest', | |
| output_path | |
| ], check=True, capture_output=True) | |
| print(f"✓ Final video created: {output_path}") | |
| # Cleanup | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| return output_path | |
| def assemble_with_moviepy( | |
| self, | |
| video_paths: List[str], | |
| timestamps: pd.DataFrame, | |
| audio_path: str, | |
| output_path: str, | |
| aspect_ratio: str = '16:9' | |
| ) -> str: | |
| """Assemble video using MoviePy""" | |
| clips = [] | |
| for i, (video_path, row) in enumerate(zip(video_paths, timestamps.itertuples())): | |
| if video_path and os.path.exists(video_path): | |
| try: | |
| clip = VideoFileClip(video_path) | |
| # Resize to aspect ratio | |
| clip = self.resize_clip_moviepy(clip, aspect_ratio) | |
| # Set duration to match timestamp | |
| clip = clip.set_duration(row.duration) | |
| clips.append(clip) | |
| except Exception as e: | |
| print(f"Error processing clip {i}: {e}") | |
| continue | |
| if not clips: | |
| raise ValueError("No valid video clips to assemble") | |
| # Concatenate clips | |
| final_video = concatenate_videoclips(clips, method='compose') | |
| # Add audio | |
| if os.path.exists(audio_path): | |
| audio = AudioFileClip(audio_path) | |
| final_video = final_video.set_audio(audio) | |
| # Write output | |
| final_video.write_videofile( | |
| output_path, | |
| codec='libx264', | |
| audio_codec='aac', | |
| fps=24, | |
| preset='medium', | |
| threads=4 | |
| ) | |
| # Cleanup | |
| for clip in clips: | |
| clip.close() | |
| final_video.close() | |
| return output_path | |
| def assemble_video( | |
| self, | |
| video_paths: List[str], | |
| timestamps: pd.DataFrame, | |
| audio_path: str, | |
| output_path: str, | |
| aspect_ratio: str = '16:9', | |
| add_subtitles: bool = False | |
| ) -> str: | |
| """ | |
| Assemble final video (auto-selects MoviePy or ffmpeg) | |
| Args: | |
| video_paths: List of video clip paths | |
| timestamps: DataFrame with word timestamps | |
| audio_path: Path to narration audio | |
| output_path: Output video path | |
| aspect_ratio: Target aspect ratio | |
| add_subtitles: Whether to add subtitles (future implementation) | |
| Returns: | |
| Path to final video | |
| """ | |
| if self.use_moviepy: | |
| try: | |
| return self.assemble_with_moviepy( | |
| video_paths, timestamps, audio_path, output_path, aspect_ratio | |
| ) | |
| except Exception as e: | |
| print(f"MoviePy failed: {e}. Falling back to ffmpeg...") | |
| return self.assemble_with_ffmpeg( | |
| video_paths, timestamps, audio_path, output_path, aspect_ratio | |
| ) | |
| else: | |
| return self.assemble_with_ffmpeg( | |
| video_paths, timestamps, audio_path, output_path, aspect_ratio | |
| ) | |
| # ==================== MAIN PIPELINE ==================== | |
| class VideoPipeline: | |
| """Main pipeline orchestrator""" | |
| def __init__(self): | |
| self.stt = SpeechToText() | |
| self.tts = TextToSpeech() | |
| self.timestamp_gen = TimestampGenerator() | |
| self.timestamp_cleaner = TimestampCleaner() | |
| self.visual_fetcher = VisualFetcher() | |
| self.video_assembler = VideoAssembler() | |
| def process( | |
| self, | |
| # Inputs | |
| text_input: Optional[str] = None, | |
| markdown_file: Optional[str] = None, | |
| audio_file: Optional[str] = None, | |
| json_input: Optional[str] = None, | |
| # Configuration | |
| language: str = 'en', | |
| voice: str = 'en-US-AriaNeural', | |
| speed: float = 1.0, | |
| pitch: int = 0, | |
| aspect_ratio: str = '16:9', | |
| visual_source: str = 'pexels', | |
| add_subtitles: bool = False, | |
| # Output directory | |
| output_dir: str = None | |
| ) -> Dict[str, Any]: | |
| """ | |
| Main pipeline execution | |
| Returns: | |
| Dictionary with paths to all outputs and logs | |
| """ | |
| # Create output directory | |
| if output_dir is None: | |
| output_dir = tempfile.mkdtemp(prefix='video_pipeline_') | |
| Path(output_dir).mkdir(parents=True, exist_ok=True) | |
| logs = [] | |
| try: | |
| # ===== INPUT PRIORITY LOGIC ===== | |
| # 1. JSON override | |
| if json_input: | |
| try: | |
| config = json.loads(json_input) | |
| text_input = config.get('text', text_input) | |
| voice = config.get('voice', voice) | |
| speed = config.get('speed', speed) | |
| pitch = config.get('pitch', pitch) | |
| aspect_ratio = config.get('aspect_ratio', aspect_ratio) | |
| visual_source = config.get('visual_source', visual_source) | |
| add_subtitles = config.get('subtitles', add_subtitles) | |
| language = config.get('language', language) | |
| logs.append("JSON config loaded") | |
| except json.JSONDecodeError as e: | |
| logs.append(f"JSON parse error: {e}") | |
| # 2. Audio transcription | |
| if audio_file and os.path.exists(audio_file): | |
| logs.append("Transcribing audio...") | |
| text_input = self.stt.transcribe(audio_file, language if language != 'auto' else None) | |
| logs.append(f"Transcription: {text_input[:100]}...") | |
| # 3. Markdown extraction | |
| elif markdown_file and os.path.exists(markdown_file): | |
| logs.append("Reading markdown file...") | |
| with open(markdown_file, 'r', encoding='utf-8') as f: | |
| text_input = f.read() | |
| logs.append(f"Markdown text: {text_input[:100]}...") | |
| # 4. Direct text | |
| if not text_input or not text_input.strip(): | |
| return { | |
| 'status': 'error', | |
| 'message': 'No input provided', | |
| 'logs': logs | |
| } | |
| text_input = text_input.strip() | |
| # ===== STEP 2: TEXT-TO-SPEECH ===== | |
| audio_path = os.path.join(output_dir, 'narration.wav') | |
| logs.append(f"Generating speech with voice: {voice}") | |
| self.tts.synthesize(text_input, voice, speed, pitch, audio_path) | |
| logs.append(f"Audio generated: {audio_path}") | |
| # ===== STEP 3: WORD-LEVEL TIMESTAMPS ===== | |
| logs.append("Generating word-level timestamps...") | |
| timestamps_df = self.timestamp_gen.generate_timestamps(audio_path) | |
| raw_csv_path = os.path.join(output_dir, 'timestamps_raw.csv') | |
| timestamps_df.to_csv(raw_csv_path, index=False) | |
| logs.append(f"Raw timestamps: {len(timestamps_df)} words") | |
| # ===== STEP 4: TIMESTAMP CLEANING ===== | |
| logs.append("Cleaning timestamps...") | |
| cleaned_df = self.timestamp_cleaner.clean_timestamps(timestamps_df) | |
| logs.append(f"Cleaned: {len(cleaned_df)} meaningful words") | |
| logs.append("Extending timestamp durations...") | |
| extended_df = self.timestamp_cleaner.extend_timestamps(cleaned_df) | |
| cleaned_csv_path = os.path.join(output_dir, 'timestamps_cleaned.csv') | |
| extended_df.to_csv(cleaned_csv_path, index=False) | |
| logs.append(f"Extended timestamps saved: {cleaned_csv_path}") | |
| # ===== STEP 5: VISUAL FETCHING ===== | |
| logs.append(f"Fetching visuals from {visual_source}...") | |
| queries = extended_df['word'].tolist() | |
| visuals_dir = os.path.join(output_dir, 'visuals') | |
| video_paths = self.visual_fetcher.fetch_visuals(queries, visuals_dir, visual_source) | |
| valid_count = sum(1 for p in video_paths if p) | |
| logs.append(f"Downloaded {valid_count}/{len(queries)} videos") | |
| # ===== STEP 6: VIDEO ASSEMBLY ===== | |
| logs.append("Assembling final video...") | |
| final_video_path = os.path.join(output_dir, 'final_video.mp4') | |
| # Filter out None paths and verify files exist | |
| valid_indices = [] | |
| for i, p in enumerate(video_paths): | |
| if p and os.path.exists(p) and os.path.getsize(p) > 1000: | |
| valid_indices.append(i) | |
| else: | |
| if p: | |
| logs.append(f"Skipping invalid video {i}: {p}") | |
| if not valid_indices: | |
| return { | |
| 'status': 'error', | |
| 'message': f'No valid video clips available. Downloaded {len(video_paths)} files but none are valid. Check API keys and rate limits.', | |
| 'audio_path': audio_path, | |
| 'timestamp_csv': cleaned_csv_path, | |
| 'logs': logs | |
| } | |
| valid_video_paths = [video_paths[i] for i in valid_indices] | |
| valid_timestamps = extended_df.iloc[valid_indices].reset_index(drop=True) | |
| logs.append(f"Valid clips for assembly: {len(valid_video_paths)}") | |
| # List valid video files with sizes | |
| for i, vp in enumerate(valid_video_paths): | |
| size_mb = os.path.getsize(vp) / (1024 * 1024) | |
| logs.append(f" Clip {i}: {size_mb:.2f} MB - {valid_timestamps.iloc[i]['word']}") | |
| self.video_assembler.assemble_video( | |
| valid_video_paths, | |
| valid_timestamps, | |
| audio_path, | |
| final_video_path, | |
| aspect_ratio, | |
| add_subtitles | |
| ) | |
| logs.append(f"Video assembly complete: {final_video_path}") | |
| # ===== RETURN RESULTS ===== | |
| return { | |
| 'status': 'success', | |
| 'audio_path': audio_path, | |
| 'timestamp_csv_raw': raw_csv_path, | |
| 'timestamp_csv_cleaned': cleaned_csv_path, | |
| 'video_path': final_video_path, | |
| 'output_directory': output_dir, | |
| 'logs': logs | |
| } | |
| except Exception as e: | |
| logs.append(f"ERROR: {str(e)}") | |
| import traceback | |
| logs.append(traceback.format_exc()) | |
| return { | |
| 'status': 'error', | |
| 'message': str(e), | |
| 'logs': logs | |
| } | |
| # ==================== GRADIO INTERFACE ==================== | |
| def create_gradio_interface(): | |
| """Create Gradio UI and API""" | |
| pipeline = VideoPipeline() | |
| def process_wrapper( | |
| text_input, | |
| markdown_file, | |
| audio_file, | |
| json_input, | |
| language, | |
| voice, | |
| speed, | |
| pitch, | |
| aspect_ratio, | |
| visual_source, | |
| add_subtitles | |
| ): | |
| """Wrapper for Gradio interface""" | |
| result = pipeline.process( | |
| text_input=text_input, | |
| markdown_file=markdown_file.name if markdown_file else None, | |
| audio_file=audio_file.name if audio_file else None, | |
| json_input=json_input, | |
| language=language, | |
| voice=voice, | |
| speed=speed, | |
| pitch=pitch, | |
| aspect_ratio=aspect_ratio, | |
| visual_source=visual_source, | |
| add_subtitles=add_subtitles | |
| ) | |
| # Format logs | |
| log_text = "\n".join(result.get('logs', [])) | |
| # Return outputs | |
| if result['status'] == 'success': | |
| return ( | |
| result.get('audio_path'), | |
| result.get('timestamp_csv_cleaned'), | |
| result.get('video_path'), | |
| log_text, | |
| json.dumps(result, indent=2) | |
| ) | |
| else: | |
| return ( | |
| None, | |
| None, | |
| None, | |
| log_text, | |
| json.dumps(result, indent=2) | |
| ) | |
| # ===== GRADIO UI ===== | |
| with gr.Blocks(title="Text/Audio → Video Pipeline") as demo: | |
| gr.Markdown(""" | |
| # 🎬 Text/Audio → Timestamped Video Pipeline | |
| **Production-ready Gradio Space for automated video generation** | |
| This space converts text or voice into a complete narrated video with: | |
| - Speech synthesis | |
| - Word-level timestamps | |
| - Automated visual fetching | |
| - Final video assembly | |
| **API Available**: Use this space programmatically via Gradio API | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### 📥 Input (Choose One or Multiple)") | |
| text_input = gr.Textbox( | |
| label="Text Input", | |
| placeholder="Enter your narration text here...", | |
| lines=5 | |
| ) | |
| markdown_file = gr.File( | |
| label="Markdown File (.md)", | |
| file_types=['.md'] | |
| ) | |
| audio_file = gr.File( | |
| label="Audio File (wav/mp3)", | |
| file_types=['.wav', '.mp3', '.m4a'] | |
| ) | |
| json_input = gr.Textbox( | |
| label="JSON API Input (Advanced)", | |
| placeholder='{"text": "...", "voice": "...", ...}', | |
| lines=3 | |
| ) | |
| with gr.Column(): | |
| gr.Markdown("### ⚙️ Configuration") | |
| language = gr.Dropdown( | |
| choices=['auto', 'en', 'es', 'fr', 'de', 'hi', 'ja', 'zh'], | |
| value='en', | |
| label="Language" | |
| ) | |
| voice = gr.Dropdown( | |
| choices=list(TextToSpeech.VOICES.keys()), | |
| value='en-US-AriaNeural', | |
| label="Voice" | |
| ) | |
| speed = gr.Slider( | |
| minimum=0.5, | |
| maximum=2.0, | |
| value=1.0, | |
| step=0.1, | |
| label="Speech Speed" | |
| ) | |
| pitch = gr.Slider( | |
| minimum=-20, | |
| maximum=20, | |
| value=0, | |
| step=1, | |
| label="Pitch (Hz)" | |
| ) | |
| aspect_ratio = gr.Dropdown( | |
| choices=['16:9', '9:16', '1:1', '4:3'], | |
| value='16:9', | |
| label="Aspect Ratio" | |
| ) | |
| visual_source = gr.Dropdown( | |
| choices=['pexels', 'tenor'], | |
| value='pexels', | |
| label="Visual Source" | |
| ) | |
| add_subtitles = gr.Checkbox( | |
| label="Add Subtitles (Future)", | |
| value=False | |
| ) | |
| run_btn = gr.Button("🚀 Run Pipeline", variant="primary", size="lg") | |
| gr.Markdown("### 📤 Outputs") | |
| with gr.Row(): | |
| audio_output = gr.Audio(label="Generated Narration") | |
| csv_output = gr.File(label="Cleaned Timestamps (CSV)") | |
| video_output = gr.Video(label="Final Video") | |
| logs_output = gr.Textbox( | |
| label="Execution Logs", | |
| lines=10, | |
| max_lines=20 | |
| ) | |
| api_output = gr.JSON(label="API Response (JSON)") | |
| # Connect interface | |
| run_btn.click( | |
| fn=process_wrapper, | |
| inputs=[ | |
| text_input, | |
| markdown_file, | |
| audio_file, | |
| json_input, | |
| language, | |
| voice, | |
| speed, | |
| pitch, | |
| aspect_ratio, | |
| visual_source, | |
| add_subtitles | |
| ], | |
| outputs=[ | |
| audio_output, | |
| csv_output, | |
| video_output, | |
| logs_output, | |
| api_output | |
| ] | |
| ) | |
| gr.Markdown(""" | |
| --- | |
| ### 🔌 API Usage | |
| **Endpoint**: Use Gradio Client to call this space programmatically | |
| ```python | |
| from gradio_client import Client | |
| client = Client("YOUR_SPACE_URL") | |
| result = client.predict( | |
| text_input="Your narration text", | |
| markdown_file=None, | |
| audio_file=None, | |
| json_input='{"voice": "en-IN-PrabhatNeural", "speed": 1.2}', | |
| language="en", | |
| voice="en-IN-PrabhatNeural", | |
| speed=1.0, | |
| pitch=0, | |
| aspect_ratio="16:9", | |
| visual_source="pexels", | |
| add_subtitles=False, | |
| api_name="/predict" | |
| ) | |
| ``` | |
| **Google Apps Script Integration**: Use UrlFetchApp to POST to the API endpoint | |
| """) | |
| return demo | |
| # ==================== LAUNCH ==================== | |
| if __name__ == "__main__": | |
| demo = create_gradio_interface() | |
| demo.launch( | |
| share=False, | |
| server_name="0.0.0.0", | |
| server_port=7860 | |
| ) |