""" Hugging Face Gradio Space: Text/Audio → Timestamped Video Pipeline Production-ready, API-first, modular architecture This space acts as Step 1+2+3 of an autonomous YouTube video pipeline. Designed to be called programmatically via Gradio API from Google Apps Script. """ import gradio as gr import json import os import shutil from pathlib import Path from typing import Dict, List, Optional, Tuple, Any import tempfile import warnings warnings.filterwarnings('ignore') # Core dependencies import torch import whisper import edge_tts import asyncio import pandas as pd import numpy as np import requests from datetime import datetime import re import nltk import subprocess import ffmpeg # MoviePy imports (handle different versions) try: from moviepy.editor import VideoFileClip, AudioFileClip, concatenate_videoclips, CompositeAudioClip from moviepy.video.fx.resize import resize as moviepy_resize MOVIEPY_AVAILABLE = True except ImportError: try: import moviepy.editor as mpy VideoFileClip = mpy.VideoFileClip AudioFileClip = mpy.AudioFileClip concatenate_videoclips = mpy.concatenate_videoclips CompositeAudioClip = mpy.CompositeAudioClip MOVIEPY_AVAILABLE = True except ImportError: MOVIEPY_AVAILABLE = False print("WARNING: MoviePy not available. Using ffmpeg directly for video assembly.") # Download NLTK data try: nltk.data.find('corpora/stopwords') except LookupError: nltk.download('stopwords', quiet=True) from nltk.corpus import stopwords # ==================== CONFIGURATION ==================== # Common words to filter (configurable) COMMON_WORDS = set([ 'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'is', 'are', 'was', 'were', 'be', 'been', 'being', 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'should', 'could', 'may', 'might', 'must', 'can', 'it', 'its', 'this', 'that', 'these', 'those', 'i', 'you', 'he', 'she', 'we', 'they', 'them', 'their' ]) # Add NLTK stopwords try: COMMON_WORDS.update(stopwords.words('english')) except: pass # Pexels API (Free tier - register at pexels.com) PEXELS_API_KEY = os.environ.get('PEXELS_API_KEY', '') # Tenor API (Free tier - register at tenor.com) TENOR_API_KEY = os.environ.get('TENOR_API_KEY', '') # ==================== STEP 1: SPEECH-TO-TEXT ==================== class SpeechToText: """Handles audio transcription using Whisper""" def __init__(self): self.model = None def load_model(self, model_size="base"): """Load Whisper model (lazy loading)""" if self.model is None: print(f"Loading Whisper {model_size} model...") self.model = whisper.load_model(model_size) return self.model def transcribe(self, audio_path: str, language: Optional[str] = None) -> str: """ Transcribe audio to text Args: audio_path: Path to audio file language: Language code (e.g., 'en', 'es') or None for auto-detect Returns: Transcribed text """ model = self.load_model() result = model.transcribe( audio_path, language=language, fp16=torch.cuda.is_available() ) return result['text'].strip() # ==================== STEP 2: TEXT-TO-SPEECH ==================== class TextToSpeech: """Handles TTS using Edge TTS (free, high quality)""" # Available voices (extensible) VOICES = { 'en-US-AriaNeural': 'English (US) - Aria (Female)', 'en-US-GuyNeural': 'English (US) - Guy (Male)', 'en-GB-SoniaNeural': 'English (UK) - Sonia (Female)', 'en-GB-RyanNeural': 'English (UK) - Ryan (Male)', 'en-IN-NeerjaNeural': 'English (India) - Neerja (Female)', 'en-IN-PrabhatNeural': 'English (India) - Prabhat (Male)', 'es-ES-ElviraNeural': 'Spanish (Spain) - Elvira (Female)', 'fr-FR-DeniseNeural': 'French (France) - Denise (Female)', 'de-DE-KatjaNeural': 'German - Katja (Female)', 'hi-IN-SwaraNeural': 'Hindi - Swara (Female)', } async def synthesize_async( self, text: str, voice: str = 'en-US-AriaNeural', rate: str = '+0%', pitch: str = '+0Hz', output_path: str = 'output.wav' ) -> str: """ Synthesize speech from text (async) Args: text: Input text voice: Voice ID rate: Speed adjustment (e.g., '+10%', '-20%') pitch: Pitch adjustment (e.g., '+5Hz', '-10Hz') output_path: Output file path Returns: Path to generated audio file """ communicate = edge_tts.Communicate(text, voice, rate=rate, pitch=pitch) await communicate.save(output_path) return output_path def synthesize(self, text: str, voice: str, rate: float, pitch: int, output_path: str) -> str: """Sync wrapper for TTS""" # Convert rate (1.0 = normal) to percentage rate_str = f"{int((rate - 1.0) * 100):+d}%" pitch_str = f"{pitch:+d}Hz" return asyncio.run(self.synthesize_async(text, voice, rate_str, pitch_str, output_path)) # ==================== STEP 3: WORD-LEVEL TIMESTAMPS ==================== class TimestampGenerator: """Generate word-level timestamps using Whisper""" def __init__(self): self.model = None def load_model(self, model_size="base"): """Load Whisper model""" if self.model is None: self.model = whisper.load_model(model_size) return self.model def generate_timestamps(self, audio_path: str) -> pd.DataFrame: """ Generate word-level timestamps Args: audio_path: Path to audio file Returns: DataFrame with columns: word, start, end, duration """ model = self.load_model() result = model.transcribe( audio_path, word_timestamps=True, fp16=torch.cuda.is_available() ) timestamps = [] for segment in result['segments']: if 'words' in segment: for word_info in segment['words']: timestamps.append({ 'word': word_info['word'].strip(), 'start': word_info['start'], 'end': word_info['end'], 'duration': word_info['end'] - word_info['start'] }) return pd.DataFrame(timestamps) # ==================== STEP 4: TIMESTAMP CLEANING ==================== class TimestampCleaner: """Clean and extend timestamps for scene alignment""" def __init__(self, common_words: set = COMMON_WORDS): self.common_words = common_words def is_meaningful(self, word: str) -> bool: """Check if word is meaningful (not common/stopword)""" word_lower = word.lower().strip() # Remove punctuation word_clean = re.sub(r'[^\w\s]', '', word_lower) if len(word_clean) < 2: return False if word_clean in self.common_words: return False if word_clean.isdigit(): return False return True def remove_repetitive(self, words: List[str], window: int = 3) -> List[bool]: """Mark repetitive words within a window""" keep = [True] * len(words) for i in range(len(words)): word = words[i].lower().strip() # Check if this word appears in the next 'window' words for j in range(i + 1, min(i + window + 1, len(words))): if words[j].lower().strip() == word: keep[j] = False return keep def clean_timestamps(self, df: pd.DataFrame) -> pd.DataFrame: """ Clean timestamps: remove common words, repetitive words Args: df: DataFrame with word-level timestamps Returns: Cleaned DataFrame """ # Filter meaningful words df['meaningful'] = df['word'].apply(self.is_meaningful) # Remove repetitive words keep_mask = self.remove_repetitive(df['word'].tolist()) df['not_repetitive'] = keep_mask # Combine filters df_cleaned = df[df['meaningful'] & df['not_repetitive']].copy() return df_cleaned[['word', 'start', 'end', 'duration']].reset_index(drop=True) def extend_timestamps(self, df: pd.DataFrame, min_duration: float = 2.0) -> pd.DataFrame: """ Extend timestamp durations to next word (scene duration logic) Args: df: Cleaned timestamps min_duration: Minimum scene duration Returns: DataFrame with extended durations """ df = df.copy() for i in range(len(df) - 1): # Extend to next word's start time df.loc[i, 'end'] = df.loc[i + 1, 'start'] df.loc[i, 'duration'] = df.loc[i, 'end'] - df.loc[i, 'start'] # Ensure minimum duration df['duration'] = df['duration'].clip(lower=min_duration) df['end'] = df['start'] + df['duration'] return df # ==================== STEP 5: VISUAL FETCHING ==================== class VisualFetcher: """Fetch videos/GIFs from Pexels or Tenor""" def __init__(self, pexels_key: str = PEXELS_API_KEY, tenor_key: str = TENOR_API_KEY): self.pexels_key = pexels_key self.tenor_key = tenor_key def create_placeholder_video(self, output_path: str, duration: float = 3.0, text: str = "") -> bool: """Create a placeholder video using ffmpeg when real video unavailable""" try: # Create a solid color video with text overlay color = "0x2C3E50" # Dark blue-gray cmd = [ 'ffmpeg', '-y', '-f', 'lavfi', '-i', f'color=c={color}:s=1920x1080:d={duration}', '-vf', f'drawtext=text=\'{text}\':fontcolor=white:fontsize=60:x=(w-text_w)/2:y=(h-text_h)/2', '-c:v', 'libx264', '-t', str(duration), '-pix_fmt', 'yuv420p', output_path ] subprocess.run(cmd, check=True, capture_output=True, timeout=30) if os.path.exists(output_path) and os.path.getsize(output_path) > 1000: print(f"Created placeholder video: {output_path}") return True except Exception as e: print(f"Failed to create placeholder: {e}") return False def search_pexels(self, query: str, per_page: int = 1) -> Optional[str]: """Search Pexels for video URL""" if not self.pexels_key: print("WARNING: PEXELS_API_KEY not set") return None url = "https://api.pexels.com/videos/search" headers = {"Authorization": self.pexels_key} params = {"query": query, "per_page": per_page, "orientation": "landscape"} try: response = requests.get(url, headers=headers, params=params, timeout=10) response.raise_for_status() data = response.json() if data.get('videos'): # Get medium quality video video_files = data['videos'][0]['video_files'] for vf in video_files: if vf.get('quality') in ['hd', 'sd']: print(f"Found Pexels video: {vf['quality']} - {vf.get('width')}x{vf.get('height')}") return vf['link'] # Fallback to first available if video_files: print(f"Using fallback video file") return video_files[0]['link'] else: print(f"No Pexels results for: {query}") except requests.exceptions.HTTPError as e: if e.response.status_code == 429: print(f"Pexels rate limit exceeded for '{query}'") else: print(f"Pexels HTTP error for '{query}': {e}") except Exception as e: print(f"Pexels error for '{query}': {e}") return None def search_tenor(self, query: str, limit: int = 1) -> Optional[str]: """Search Tenor for GIF URL""" if not self.tenor_key: return None url = "https://tenor.googleapis.com/v2/search" params = { "q": query, "key": self.tenor_key, "limit": limit, "media_filter": "mp4" } try: response = requests.get(url, params=params, timeout=10) response.raise_for_status() data = response.json() if data['results']: return data['results'][0]['media_formats']['mp4']['url'] except Exception as e: print(f"Tenor error for '{query}': {e}") return None def download_video(self, url: str, output_path: str) -> bool: """Download video from URL""" try: response = requests.get(url, stream=True, timeout=30) response.raise_for_status() with open(output_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) # Verify file was downloaded and has content if os.path.exists(output_path) and os.path.getsize(output_path) > 1000: return True else: print(f"Downloaded file is invalid or too small: {output_path}") return False except Exception as e: print(f"Download error: {e}") return False def fetch_visuals( self, queries: List[str], output_dir: str, source: str = 'pexels', use_placeholders: bool = True ) -> List[str]: """ Fetch and download videos for list of queries Args: queries: List of search queries output_dir: Directory to save videos source: 'pexels' or 'tenor' use_placeholders: Create placeholder videos if download fails Returns: List of downloaded video paths (None for failed downloads) """ Path(output_dir).mkdir(parents=True, exist_ok=True) video_paths = [] for i, query in enumerate(queries, 1): print(f"Fetching visual {i}/{len(queries)}: {query}") output_path = os.path.join(output_dir, f"{i}.mp4") success = False # Try to download real video if source == 'pexels': video_url = self.search_pexels(query) elif source == 'tenor': video_url = self.search_tenor(query) else: video_url = None if video_url: success = self.download_video(video_url, output_path) if success and os.path.exists(output_path): print(f"✓ Downloaded: {output_path} ({os.path.getsize(output_path)} bytes)") video_paths.append(output_path) continue # Fallback to placeholder if download failed if use_placeholders: print(f"⚠ Creating placeholder for: {query}") if self.create_placeholder_video(output_path, duration=3.0, text=query[:30]): video_paths.append(output_path) else: print(f"✗ Failed to create placeholder for: {query}") video_paths.append(None) else: print(f"✗ No video available for: {query}") video_paths.append(None) # Log summary valid_count = sum(1 for p in video_paths if p is not None) print(f"Download summary: {valid_count}/{len(queries)} videos available") return video_paths # ==================== STEP 6: VIDEO ASSEMBLY ==================== # At the top of your file, add this import from typing import TYPE_CHECKING if TYPE_CHECKING: from moviepy.editor import VideoFileClip # Then modify the VideoAssembler class: class VideoAssembler: """Assemble final video from clips + audio + timestamps""" def __init__(self): self.use_moviepy = MOVIEPY_AVAILABLE def resize_clip_moviepy(self, clip: "VideoFileClip", aspect_ratio: str) -> "VideoFileClip": """Resize clip to target aspect ratio using MoviePy""" target_ratios = { '16:9': (1920, 1080), '9:16': (1080, 1920), '1:1': (1080, 1080), '4:3': (1440, 1080) } target_w, target_h = target_ratios.get(aspect_ratio, (1920, 1080)) # Calculate scale to cover target area scale_w = target_w / clip.w scale_h = target_h / clip.h scale = max(scale_w, scale_h) # Resize and crop resized = clip.resize(scale) # Center crop x_center = resized.w / 2 y_center = resized.h / 2 x1 = x_center - target_w / 2 y1 = y_center - target_h / 2 return resized.crop(x1=x1, y1=y1, width=target_w, height=target_h) def assemble_with_ffmpeg( self, video_paths: List[str], timestamps: pd.DataFrame, audio_path: str, output_path: str, aspect_ratio: str = '16:9' ) -> str: """Assemble video using ffmpeg directly (fallback method)""" target_ratios = { '16:9': (1920, 1080), '9:16': (1080, 1920), '1:1': (1080, 1080), '4:3': (1440, 1080) } target_w, target_h = target_ratios.get(aspect_ratio, (1920, 1080)) # Create temporary directory for processed clips temp_dir = os.path.join(os.path.dirname(output_path), 'temp_clips') Path(temp_dir).mkdir(parents=True, exist_ok=True) processed_clips = [] # Process each clip with ffmpeg for i, (video_path, row) in enumerate(zip(video_paths, timestamps.itertuples())): if not video_path or not os.path.exists(video_path): print(f"Skipping clip {i}: Invalid path") continue # Verify file exists and has size file_size = os.path.getsize(video_path) if file_size < 1000: print(f"Skipping clip {i}: File too small ({file_size} bytes)") continue temp_output = os.path.join(temp_dir, f'clip_{i:04d}.mp4') try: print(f"Processing clip {i}: {video_path} -> {temp_output}") # Resize, crop, and set duration using ffmpeg result = subprocess.run([ 'ffmpeg', '-y', '-i', video_path, '-vf', f'scale={target_w}:{target_h}:force_original_aspect_ratio=increase,crop={target_w}:{target_h}', '-t', str(row.duration), '-c:v', 'libx264', '-preset', 'fast', '-crf', '23', '-an', # Remove audio from clips temp_output ], check=True, capture_output=True, text=True) # Verify output exists if os.path.exists(temp_output) and os.path.getsize(temp_output) > 1000: processed_clips.append(temp_output) print(f"✓ Processed clip {i}: {os.path.getsize(temp_output)} bytes") else: print(f"✗ Failed to process clip {i}: Output invalid") except subprocess.CalledProcessError as e: print(f"✗ ffmpeg error for clip {i}:") print(f" stdout: {e.stdout}") print(f" stderr: {e.stderr}") continue except Exception as e: print(f"✗ Unexpected error processing clip {i}: {e}") continue if not processed_clips: raise ValueError(f"No valid clips processed. Checked {len(video_paths)} input videos.") print(f"Successfully processed {len(processed_clips)} clips") # Create concat file concat_file = os.path.join(temp_dir, 'concat.txt') with open(concat_file, 'w') as f: for clip in processed_clips: f.write(f"file '{os.path.abspath(clip)}'\n") print(f"Concatenating {len(processed_clips)} clips...") # Concatenate clips temp_video = os.path.join(temp_dir, 'concatenated.mp4') subprocess.run([ 'ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', concat_file, '-c', 'copy', temp_video ], check=True, capture_output=True) print(f"Adding audio track...") # Add audio subprocess.run([ 'ffmpeg', '-y', '-i', temp_video, '-i', audio_path, '-c:v', 'copy', '-c:a', 'aac', '-shortest', output_path ], check=True, capture_output=True) print(f"✓ Final video created: {output_path}") # Cleanup shutil.rmtree(temp_dir, ignore_errors=True) return output_path def assemble_with_moviepy( self, video_paths: List[str], timestamps: pd.DataFrame, audio_path: str, output_path: str, aspect_ratio: str = '16:9' ) -> str: """Assemble video using MoviePy""" clips = [] for i, (video_path, row) in enumerate(zip(video_paths, timestamps.itertuples())): if video_path and os.path.exists(video_path): try: clip = VideoFileClip(video_path) # Resize to aspect ratio clip = self.resize_clip_moviepy(clip, aspect_ratio) # Set duration to match timestamp clip = clip.set_duration(row.duration) clips.append(clip) except Exception as e: print(f"Error processing clip {i}: {e}") continue if not clips: raise ValueError("No valid video clips to assemble") # Concatenate clips final_video = concatenate_videoclips(clips, method='compose') # Add audio if os.path.exists(audio_path): audio = AudioFileClip(audio_path) final_video = final_video.set_audio(audio) # Write output final_video.write_videofile( output_path, codec='libx264', audio_codec='aac', fps=24, preset='medium', threads=4 ) # Cleanup for clip in clips: clip.close() final_video.close() return output_path def assemble_video( self, video_paths: List[str], timestamps: pd.DataFrame, audio_path: str, output_path: str, aspect_ratio: str = '16:9', add_subtitles: bool = False ) -> str: """ Assemble final video (auto-selects MoviePy or ffmpeg) Args: video_paths: List of video clip paths timestamps: DataFrame with word timestamps audio_path: Path to narration audio output_path: Output video path aspect_ratio: Target aspect ratio add_subtitles: Whether to add subtitles (future implementation) Returns: Path to final video """ if self.use_moviepy: try: return self.assemble_with_moviepy( video_paths, timestamps, audio_path, output_path, aspect_ratio ) except Exception as e: print(f"MoviePy failed: {e}. Falling back to ffmpeg...") return self.assemble_with_ffmpeg( video_paths, timestamps, audio_path, output_path, aspect_ratio ) else: return self.assemble_with_ffmpeg( video_paths, timestamps, audio_path, output_path, aspect_ratio ) # ==================== MAIN PIPELINE ==================== class VideoPipeline: """Main pipeline orchestrator""" def __init__(self): self.stt = SpeechToText() self.tts = TextToSpeech() self.timestamp_gen = TimestampGenerator() self.timestamp_cleaner = TimestampCleaner() self.visual_fetcher = VisualFetcher() self.video_assembler = VideoAssembler() def process( self, # Inputs text_input: Optional[str] = None, markdown_file: Optional[str] = None, audio_file: Optional[str] = None, json_input: Optional[str] = None, # Configuration language: str = 'en', voice: str = 'en-US-AriaNeural', speed: float = 1.0, pitch: int = 0, aspect_ratio: str = '16:9', visual_source: str = 'pexels', add_subtitles: bool = False, # Output directory output_dir: str = None ) -> Dict[str, Any]: """ Main pipeline execution Returns: Dictionary with paths to all outputs and logs """ # Create output directory if output_dir is None: output_dir = tempfile.mkdtemp(prefix='video_pipeline_') Path(output_dir).mkdir(parents=True, exist_ok=True) logs = [] try: # ===== INPUT PRIORITY LOGIC ===== # 1. JSON override if json_input: try: config = json.loads(json_input) text_input = config.get('text', text_input) voice = config.get('voice', voice) speed = config.get('speed', speed) pitch = config.get('pitch', pitch) aspect_ratio = config.get('aspect_ratio', aspect_ratio) visual_source = config.get('visual_source', visual_source) add_subtitles = config.get('subtitles', add_subtitles) language = config.get('language', language) logs.append("JSON config loaded") except json.JSONDecodeError as e: logs.append(f"JSON parse error: {e}") # 2. Audio transcription if audio_file and os.path.exists(audio_file): logs.append("Transcribing audio...") text_input = self.stt.transcribe(audio_file, language if language != 'auto' else None) logs.append(f"Transcription: {text_input[:100]}...") # 3. Markdown extraction elif markdown_file and os.path.exists(markdown_file): logs.append("Reading markdown file...") with open(markdown_file, 'r', encoding='utf-8') as f: text_input = f.read() logs.append(f"Markdown text: {text_input[:100]}...") # 4. Direct text if not text_input or not text_input.strip(): return { 'status': 'error', 'message': 'No input provided', 'logs': logs } text_input = text_input.strip() # ===== STEP 2: TEXT-TO-SPEECH ===== audio_path = os.path.join(output_dir, 'narration.wav') logs.append(f"Generating speech with voice: {voice}") self.tts.synthesize(text_input, voice, speed, pitch, audio_path) logs.append(f"Audio generated: {audio_path}") # ===== STEP 3: WORD-LEVEL TIMESTAMPS ===== logs.append("Generating word-level timestamps...") timestamps_df = self.timestamp_gen.generate_timestamps(audio_path) raw_csv_path = os.path.join(output_dir, 'timestamps_raw.csv') timestamps_df.to_csv(raw_csv_path, index=False) logs.append(f"Raw timestamps: {len(timestamps_df)} words") # ===== STEP 4: TIMESTAMP CLEANING ===== logs.append("Cleaning timestamps...") cleaned_df = self.timestamp_cleaner.clean_timestamps(timestamps_df) logs.append(f"Cleaned: {len(cleaned_df)} meaningful words") logs.append("Extending timestamp durations...") extended_df = self.timestamp_cleaner.extend_timestamps(cleaned_df) cleaned_csv_path = os.path.join(output_dir, 'timestamps_cleaned.csv') extended_df.to_csv(cleaned_csv_path, index=False) logs.append(f"Extended timestamps saved: {cleaned_csv_path}") # ===== STEP 5: VISUAL FETCHING ===== logs.append(f"Fetching visuals from {visual_source}...") queries = extended_df['word'].tolist() visuals_dir = os.path.join(output_dir, 'visuals') video_paths = self.visual_fetcher.fetch_visuals(queries, visuals_dir, visual_source) valid_count = sum(1 for p in video_paths if p) logs.append(f"Downloaded {valid_count}/{len(queries)} videos") # ===== STEP 6: VIDEO ASSEMBLY ===== logs.append("Assembling final video...") final_video_path = os.path.join(output_dir, 'final_video.mp4') # Filter out None paths and verify files exist valid_indices = [] for i, p in enumerate(video_paths): if p and os.path.exists(p) and os.path.getsize(p) > 1000: valid_indices.append(i) else: if p: logs.append(f"Skipping invalid video {i}: {p}") if not valid_indices: return { 'status': 'error', 'message': f'No valid video clips available. Downloaded {len(video_paths)} files but none are valid. Check API keys and rate limits.', 'audio_path': audio_path, 'timestamp_csv': cleaned_csv_path, 'logs': logs } valid_video_paths = [video_paths[i] for i in valid_indices] valid_timestamps = extended_df.iloc[valid_indices].reset_index(drop=True) logs.append(f"Valid clips for assembly: {len(valid_video_paths)}") # List valid video files with sizes for i, vp in enumerate(valid_video_paths): size_mb = os.path.getsize(vp) / (1024 * 1024) logs.append(f" Clip {i}: {size_mb:.2f} MB - {valid_timestamps.iloc[i]['word']}") self.video_assembler.assemble_video( valid_video_paths, valid_timestamps, audio_path, final_video_path, aspect_ratio, add_subtitles ) logs.append(f"Video assembly complete: {final_video_path}") # ===== RETURN RESULTS ===== return { 'status': 'success', 'audio_path': audio_path, 'timestamp_csv_raw': raw_csv_path, 'timestamp_csv_cleaned': cleaned_csv_path, 'video_path': final_video_path, 'output_directory': output_dir, 'logs': logs } except Exception as e: logs.append(f"ERROR: {str(e)}") import traceback logs.append(traceback.format_exc()) return { 'status': 'error', 'message': str(e), 'logs': logs } # ==================== GRADIO INTERFACE ==================== def create_gradio_interface(): """Create Gradio UI and API""" pipeline = VideoPipeline() def process_wrapper( text_input, markdown_file, audio_file, json_input, language, voice, speed, pitch, aspect_ratio, visual_source, add_subtitles ): """Wrapper for Gradio interface""" result = pipeline.process( text_input=text_input, markdown_file=markdown_file.name if markdown_file else None, audio_file=audio_file.name if audio_file else None, json_input=json_input, language=language, voice=voice, speed=speed, pitch=pitch, aspect_ratio=aspect_ratio, visual_source=visual_source, add_subtitles=add_subtitles ) # Format logs log_text = "\n".join(result.get('logs', [])) # Return outputs if result['status'] == 'success': return ( result.get('audio_path'), result.get('timestamp_csv_cleaned'), result.get('video_path'), log_text, json.dumps(result, indent=2) ) else: return ( None, None, None, log_text, json.dumps(result, indent=2) ) # ===== GRADIO UI ===== with gr.Blocks(title="Text/Audio → Video Pipeline") as demo: gr.Markdown(""" # 🎬 Text/Audio → Timestamped Video Pipeline **Production-ready Gradio Space for automated video generation** This space converts text or voice into a complete narrated video with: - Speech synthesis - Word-level timestamps - Automated visual fetching - Final video assembly **API Available**: Use this space programmatically via Gradio API """) with gr.Row(): with gr.Column(): gr.Markdown("### 📥 Input (Choose One or Multiple)") text_input = gr.Textbox( label="Text Input", placeholder="Enter your narration text here...", lines=5 ) markdown_file = gr.File( label="Markdown File (.md)", file_types=['.md'] ) audio_file = gr.File( label="Audio File (wav/mp3)", file_types=['.wav', '.mp3', '.m4a'] ) json_input = gr.Textbox( label="JSON API Input (Advanced)", placeholder='{"text": "...", "voice": "...", ...}', lines=3 ) with gr.Column(): gr.Markdown("### ⚙️ Configuration") language = gr.Dropdown( choices=['auto', 'en', 'es', 'fr', 'de', 'hi', 'ja', 'zh'], value='en', label="Language" ) voice = gr.Dropdown( choices=list(TextToSpeech.VOICES.keys()), value='en-US-AriaNeural', label="Voice" ) speed = gr.Slider( minimum=0.5, maximum=2.0, value=1.0, step=0.1, label="Speech Speed" ) pitch = gr.Slider( minimum=-20, maximum=20, value=0, step=1, label="Pitch (Hz)" ) aspect_ratio = gr.Dropdown( choices=['16:9', '9:16', '1:1', '4:3'], value='16:9', label="Aspect Ratio" ) visual_source = gr.Dropdown( choices=['pexels', 'tenor'], value='pexels', label="Visual Source" ) add_subtitles = gr.Checkbox( label="Add Subtitles (Future)", value=False ) run_btn = gr.Button("🚀 Run Pipeline", variant="primary", size="lg") gr.Markdown("### 📤 Outputs") with gr.Row(): audio_output = gr.Audio(label="Generated Narration") csv_output = gr.File(label="Cleaned Timestamps (CSV)") video_output = gr.Video(label="Final Video") logs_output = gr.Textbox( label="Execution Logs", lines=10, max_lines=20 ) api_output = gr.JSON(label="API Response (JSON)") # Connect interface run_btn.click( fn=process_wrapper, inputs=[ text_input, markdown_file, audio_file, json_input, language, voice, speed, pitch, aspect_ratio, visual_source, add_subtitles ], outputs=[ audio_output, csv_output, video_output, logs_output, api_output ] ) gr.Markdown(""" --- ### 🔌 API Usage **Endpoint**: Use Gradio Client to call this space programmatically ```python from gradio_client import Client client = Client("YOUR_SPACE_URL") result = client.predict( text_input="Your narration text", markdown_file=None, audio_file=None, json_input='{"voice": "en-IN-PrabhatNeural", "speed": 1.2}', language="en", voice="en-IN-PrabhatNeural", speed=1.0, pitch=0, aspect_ratio="16:9", visual_source="pexels", add_subtitles=False, api_name="/predict" ) ``` **Google Apps Script Integration**: Use UrlFetchApp to POST to the API endpoint """) return demo # ==================== LAUNCH ==================== if __name__ == "__main__": demo = create_gradio_interface() demo.launch( share=False, server_name="0.0.0.0", server_port=7860 )