Abu1998's picture
Update app.py
e989a60 verified
"""
Hugging Face Gradio Space: Text/Audio → Timestamped Video Pipeline
Production-ready, API-first, modular architecture
This space acts as Step 1+2+3 of an autonomous YouTube video pipeline.
Designed to be called programmatically via Gradio API from Google Apps Script.
"""
import gradio as gr
import json
import os
import shutil
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any
import tempfile
import warnings
warnings.filterwarnings('ignore')
# Core dependencies
import torch
import whisper
import edge_tts
import asyncio
import pandas as pd
import numpy as np
import requests
from datetime import datetime
import re
import nltk
import subprocess
import ffmpeg
# MoviePy imports (handle different versions)
try:
from moviepy.editor import VideoFileClip, AudioFileClip, concatenate_videoclips, CompositeAudioClip
from moviepy.video.fx.resize import resize as moviepy_resize
MOVIEPY_AVAILABLE = True
except ImportError:
try:
import moviepy.editor as mpy
VideoFileClip = mpy.VideoFileClip
AudioFileClip = mpy.AudioFileClip
concatenate_videoclips = mpy.concatenate_videoclips
CompositeAudioClip = mpy.CompositeAudioClip
MOVIEPY_AVAILABLE = True
except ImportError:
MOVIEPY_AVAILABLE = False
print("WARNING: MoviePy not available. Using ffmpeg directly for video assembly.")
# Download NLTK data
try:
nltk.data.find('corpora/stopwords')
except LookupError:
nltk.download('stopwords', quiet=True)
from nltk.corpus import stopwords
# ==================== CONFIGURATION ====================
# Common words to filter (configurable)
COMMON_WORDS = set([
'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for',
'of', 'with', 'is', 'are', 'was', 'were', 'be', 'been', 'being',
'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'should',
'could', 'may', 'might', 'must', 'can', 'it', 'its', 'this', 'that',
'these', 'those', 'i', 'you', 'he', 'she', 'we', 'they', 'them', 'their'
])
# Add NLTK stopwords
try:
COMMON_WORDS.update(stopwords.words('english'))
except:
pass
# Pexels API (Free tier - register at pexels.com)
PEXELS_API_KEY = os.environ.get('PEXELS_API_KEY', '')
# Tenor API (Free tier - register at tenor.com)
TENOR_API_KEY = os.environ.get('TENOR_API_KEY', '')
# ==================== STEP 1: SPEECH-TO-TEXT ====================
class SpeechToText:
"""Handles audio transcription using Whisper"""
def __init__(self):
self.model = None
def load_model(self, model_size="base"):
"""Load Whisper model (lazy loading)"""
if self.model is None:
print(f"Loading Whisper {model_size} model...")
self.model = whisper.load_model(model_size)
return self.model
def transcribe(self, audio_path: str, language: Optional[str] = None) -> str:
"""
Transcribe audio to text
Args:
audio_path: Path to audio file
language: Language code (e.g., 'en', 'es') or None for auto-detect
Returns:
Transcribed text
"""
model = self.load_model()
result = model.transcribe(
audio_path,
language=language,
fp16=torch.cuda.is_available()
)
return result['text'].strip()
# ==================== STEP 2: TEXT-TO-SPEECH ====================
class TextToSpeech:
"""Handles TTS using Edge TTS (free, high quality)"""
# Available voices (extensible)
VOICES = {
'en-US-AriaNeural': 'English (US) - Aria (Female)',
'en-US-GuyNeural': 'English (US) - Guy (Male)',
'en-GB-SoniaNeural': 'English (UK) - Sonia (Female)',
'en-GB-RyanNeural': 'English (UK) - Ryan (Male)',
'en-IN-NeerjaNeural': 'English (India) - Neerja (Female)',
'en-IN-PrabhatNeural': 'English (India) - Prabhat (Male)',
'es-ES-ElviraNeural': 'Spanish (Spain) - Elvira (Female)',
'fr-FR-DeniseNeural': 'French (France) - Denise (Female)',
'de-DE-KatjaNeural': 'German - Katja (Female)',
'hi-IN-SwaraNeural': 'Hindi - Swara (Female)',
}
async def synthesize_async(
self,
text: str,
voice: str = 'en-US-AriaNeural',
rate: str = '+0%',
pitch: str = '+0Hz',
output_path: str = 'output.wav'
) -> str:
"""
Synthesize speech from text (async)
Args:
text: Input text
voice: Voice ID
rate: Speed adjustment (e.g., '+10%', '-20%')
pitch: Pitch adjustment (e.g., '+5Hz', '-10Hz')
output_path: Output file path
Returns:
Path to generated audio file
"""
communicate = edge_tts.Communicate(text, voice, rate=rate, pitch=pitch)
await communicate.save(output_path)
return output_path
def synthesize(self, text: str, voice: str, rate: float, pitch: int, output_path: str) -> str:
"""Sync wrapper for TTS"""
# Convert rate (1.0 = normal) to percentage
rate_str = f"{int((rate - 1.0) * 100):+d}%"
pitch_str = f"{pitch:+d}Hz"
return asyncio.run(self.synthesize_async(text, voice, rate_str, pitch_str, output_path))
# ==================== STEP 3: WORD-LEVEL TIMESTAMPS ====================
class TimestampGenerator:
"""Generate word-level timestamps using Whisper"""
def __init__(self):
self.model = None
def load_model(self, model_size="base"):
"""Load Whisper model"""
if self.model is None:
self.model = whisper.load_model(model_size)
return self.model
def generate_timestamps(self, audio_path: str) -> pd.DataFrame:
"""
Generate word-level timestamps
Args:
audio_path: Path to audio file
Returns:
DataFrame with columns: word, start, end, duration
"""
model = self.load_model()
result = model.transcribe(
audio_path,
word_timestamps=True,
fp16=torch.cuda.is_available()
)
timestamps = []
for segment in result['segments']:
if 'words' in segment:
for word_info in segment['words']:
timestamps.append({
'word': word_info['word'].strip(),
'start': word_info['start'],
'end': word_info['end'],
'duration': word_info['end'] - word_info['start']
})
return pd.DataFrame(timestamps)
# ==================== STEP 4: TIMESTAMP CLEANING ====================
class TimestampCleaner:
"""Clean and extend timestamps for scene alignment"""
def __init__(self, common_words: set = COMMON_WORDS):
self.common_words = common_words
def is_meaningful(self, word: str) -> bool:
"""Check if word is meaningful (not common/stopword)"""
word_lower = word.lower().strip()
# Remove punctuation
word_clean = re.sub(r'[^\w\s]', '', word_lower)
if len(word_clean) < 2:
return False
if word_clean in self.common_words:
return False
if word_clean.isdigit():
return False
return True
def remove_repetitive(self, words: List[str], window: int = 3) -> List[bool]:
"""Mark repetitive words within a window"""
keep = [True] * len(words)
for i in range(len(words)):
word = words[i].lower().strip()
# Check if this word appears in the next 'window' words
for j in range(i + 1, min(i + window + 1, len(words))):
if words[j].lower().strip() == word:
keep[j] = False
return keep
def clean_timestamps(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Clean timestamps: remove common words, repetitive words
Args:
df: DataFrame with word-level timestamps
Returns:
Cleaned DataFrame
"""
# Filter meaningful words
df['meaningful'] = df['word'].apply(self.is_meaningful)
# Remove repetitive words
keep_mask = self.remove_repetitive(df['word'].tolist())
df['not_repetitive'] = keep_mask
# Combine filters
df_cleaned = df[df['meaningful'] & df['not_repetitive']].copy()
return df_cleaned[['word', 'start', 'end', 'duration']].reset_index(drop=True)
def extend_timestamps(self, df: pd.DataFrame, min_duration: float = 2.0) -> pd.DataFrame:
"""
Extend timestamp durations to next word (scene duration logic)
Args:
df: Cleaned timestamps
min_duration: Minimum scene duration
Returns:
DataFrame with extended durations
"""
df = df.copy()
for i in range(len(df) - 1):
# Extend to next word's start time
df.loc[i, 'end'] = df.loc[i + 1, 'start']
df.loc[i, 'duration'] = df.loc[i, 'end'] - df.loc[i, 'start']
# Ensure minimum duration
df['duration'] = df['duration'].clip(lower=min_duration)
df['end'] = df['start'] + df['duration']
return df
# ==================== STEP 5: VISUAL FETCHING ====================
class VisualFetcher:
"""Fetch videos/GIFs from Pexels or Tenor"""
def __init__(self, pexels_key: str = PEXELS_API_KEY, tenor_key: str = TENOR_API_KEY):
self.pexels_key = pexels_key
self.tenor_key = tenor_key
def create_placeholder_video(self, output_path: str, duration: float = 3.0, text: str = "") -> bool:
"""Create a placeholder video using ffmpeg when real video unavailable"""
try:
# Create a solid color video with text overlay
color = "0x2C3E50" # Dark blue-gray
cmd = [
'ffmpeg', '-y',
'-f', 'lavfi', '-i', f'color=c={color}:s=1920x1080:d={duration}',
'-vf', f'drawtext=text=\'{text}\':fontcolor=white:fontsize=60:x=(w-text_w)/2:y=(h-text_h)/2',
'-c:v', 'libx264', '-t', str(duration), '-pix_fmt', 'yuv420p',
output_path
]
subprocess.run(cmd, check=True, capture_output=True, timeout=30)
if os.path.exists(output_path) and os.path.getsize(output_path) > 1000:
print(f"Created placeholder video: {output_path}")
return True
except Exception as e:
print(f"Failed to create placeholder: {e}")
return False
def search_pexels(self, query: str, per_page: int = 1) -> Optional[str]:
"""Search Pexels for video URL"""
if not self.pexels_key:
print("WARNING: PEXELS_API_KEY not set")
return None
url = "https://api.pexels.com/videos/search"
headers = {"Authorization": self.pexels_key}
params = {"query": query, "per_page": per_page, "orientation": "landscape"}
try:
response = requests.get(url, headers=headers, params=params, timeout=10)
response.raise_for_status()
data = response.json()
if data.get('videos'):
# Get medium quality video
video_files = data['videos'][0]['video_files']
for vf in video_files:
if vf.get('quality') in ['hd', 'sd']:
print(f"Found Pexels video: {vf['quality']} - {vf.get('width')}x{vf.get('height')}")
return vf['link']
# Fallback to first available
if video_files:
print(f"Using fallback video file")
return video_files[0]['link']
else:
print(f"No Pexels results for: {query}")
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
print(f"Pexels rate limit exceeded for '{query}'")
else:
print(f"Pexels HTTP error for '{query}': {e}")
except Exception as e:
print(f"Pexels error for '{query}': {e}")
return None
def search_tenor(self, query: str, limit: int = 1) -> Optional[str]:
"""Search Tenor for GIF URL"""
if not self.tenor_key:
return None
url = "https://tenor.googleapis.com/v2/search"
params = {
"q": query,
"key": self.tenor_key,
"limit": limit,
"media_filter": "mp4"
}
try:
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
if data['results']:
return data['results'][0]['media_formats']['mp4']['url']
except Exception as e:
print(f"Tenor error for '{query}': {e}")
return None
def download_video(self, url: str, output_path: str) -> bool:
"""Download video from URL"""
try:
response = requests.get(url, stream=True, timeout=30)
response.raise_for_status()
with open(output_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# Verify file was downloaded and has content
if os.path.exists(output_path) and os.path.getsize(output_path) > 1000:
return True
else:
print(f"Downloaded file is invalid or too small: {output_path}")
return False
except Exception as e:
print(f"Download error: {e}")
return False
def fetch_visuals(
self,
queries: List[str],
output_dir: str,
source: str = 'pexels',
use_placeholders: bool = True
) -> List[str]:
"""
Fetch and download videos for list of queries
Args:
queries: List of search queries
output_dir: Directory to save videos
source: 'pexels' or 'tenor'
use_placeholders: Create placeholder videos if download fails
Returns:
List of downloaded video paths (None for failed downloads)
"""
Path(output_dir).mkdir(parents=True, exist_ok=True)
video_paths = []
for i, query in enumerate(queries, 1):
print(f"Fetching visual {i}/{len(queries)}: {query}")
output_path = os.path.join(output_dir, f"{i}.mp4")
success = False
# Try to download real video
if source == 'pexels':
video_url = self.search_pexels(query)
elif source == 'tenor':
video_url = self.search_tenor(query)
else:
video_url = None
if video_url:
success = self.download_video(video_url, output_path)
if success and os.path.exists(output_path):
print(f"✓ Downloaded: {output_path} ({os.path.getsize(output_path)} bytes)")
video_paths.append(output_path)
continue
# Fallback to placeholder if download failed
if use_placeholders:
print(f"⚠ Creating placeholder for: {query}")
if self.create_placeholder_video(output_path, duration=3.0, text=query[:30]):
video_paths.append(output_path)
else:
print(f"✗ Failed to create placeholder for: {query}")
video_paths.append(None)
else:
print(f"✗ No video available for: {query}")
video_paths.append(None)
# Log summary
valid_count = sum(1 for p in video_paths if p is not None)
print(f"Download summary: {valid_count}/{len(queries)} videos available")
return video_paths
# ==================== STEP 6: VIDEO ASSEMBLY ====================
# At the top of your file, add this import
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from moviepy.editor import VideoFileClip
# Then modify the VideoAssembler class:
class VideoAssembler:
"""Assemble final video from clips + audio + timestamps"""
def __init__(self):
self.use_moviepy = MOVIEPY_AVAILABLE
def resize_clip_moviepy(self, clip: "VideoFileClip", aspect_ratio: str) -> "VideoFileClip":
"""Resize clip to target aspect ratio using MoviePy"""
target_ratios = {
'16:9': (1920, 1080),
'9:16': (1080, 1920),
'1:1': (1080, 1080),
'4:3': (1440, 1080)
}
target_w, target_h = target_ratios.get(aspect_ratio, (1920, 1080))
# Calculate scale to cover target area
scale_w = target_w / clip.w
scale_h = target_h / clip.h
scale = max(scale_w, scale_h)
# Resize and crop
resized = clip.resize(scale)
# Center crop
x_center = resized.w / 2
y_center = resized.h / 2
x1 = x_center - target_w / 2
y1 = y_center - target_h / 2
return resized.crop(x1=x1, y1=y1, width=target_w, height=target_h)
def assemble_with_ffmpeg(
self,
video_paths: List[str],
timestamps: pd.DataFrame,
audio_path: str,
output_path: str,
aspect_ratio: str = '16:9'
) -> str:
"""Assemble video using ffmpeg directly (fallback method)"""
target_ratios = {
'16:9': (1920, 1080),
'9:16': (1080, 1920),
'1:1': (1080, 1080),
'4:3': (1440, 1080)
}
target_w, target_h = target_ratios.get(aspect_ratio, (1920, 1080))
# Create temporary directory for processed clips
temp_dir = os.path.join(os.path.dirname(output_path), 'temp_clips')
Path(temp_dir).mkdir(parents=True, exist_ok=True)
processed_clips = []
# Process each clip with ffmpeg
for i, (video_path, row) in enumerate(zip(video_paths, timestamps.itertuples())):
if not video_path or not os.path.exists(video_path):
print(f"Skipping clip {i}: Invalid path")
continue
# Verify file exists and has size
file_size = os.path.getsize(video_path)
if file_size < 1000:
print(f"Skipping clip {i}: File too small ({file_size} bytes)")
continue
temp_output = os.path.join(temp_dir, f'clip_{i:04d}.mp4')
try:
print(f"Processing clip {i}: {video_path} -> {temp_output}")
# Resize, crop, and set duration using ffmpeg
result = subprocess.run([
'ffmpeg', '-y', '-i', video_path,
'-vf', f'scale={target_w}:{target_h}:force_original_aspect_ratio=increase,crop={target_w}:{target_h}',
'-t', str(row.duration),
'-c:v', 'libx264', '-preset', 'fast', '-crf', '23',
'-an', # Remove audio from clips
temp_output
], check=True, capture_output=True, text=True)
# Verify output exists
if os.path.exists(temp_output) and os.path.getsize(temp_output) > 1000:
processed_clips.append(temp_output)
print(f"✓ Processed clip {i}: {os.path.getsize(temp_output)} bytes")
else:
print(f"✗ Failed to process clip {i}: Output invalid")
except subprocess.CalledProcessError as e:
print(f"✗ ffmpeg error for clip {i}:")
print(f" stdout: {e.stdout}")
print(f" stderr: {e.stderr}")
continue
except Exception as e:
print(f"✗ Unexpected error processing clip {i}: {e}")
continue
if not processed_clips:
raise ValueError(f"No valid clips processed. Checked {len(video_paths)} input videos.")
print(f"Successfully processed {len(processed_clips)} clips")
# Create concat file
concat_file = os.path.join(temp_dir, 'concat.txt')
with open(concat_file, 'w') as f:
for clip in processed_clips:
f.write(f"file '{os.path.abspath(clip)}'\n")
print(f"Concatenating {len(processed_clips)} clips...")
# Concatenate clips
temp_video = os.path.join(temp_dir, 'concatenated.mp4')
subprocess.run([
'ffmpeg', '-y', '-f', 'concat', '-safe', '0', '-i', concat_file,
'-c', 'copy', temp_video
], check=True, capture_output=True)
print(f"Adding audio track...")
# Add audio
subprocess.run([
'ffmpeg', '-y', '-i', temp_video, '-i', audio_path,
'-c:v', 'copy', '-c:a', 'aac', '-shortest',
output_path
], check=True, capture_output=True)
print(f"✓ Final video created: {output_path}")
# Cleanup
shutil.rmtree(temp_dir, ignore_errors=True)
return output_path
def assemble_with_moviepy(
self,
video_paths: List[str],
timestamps: pd.DataFrame,
audio_path: str,
output_path: str,
aspect_ratio: str = '16:9'
) -> str:
"""Assemble video using MoviePy"""
clips = []
for i, (video_path, row) in enumerate(zip(video_paths, timestamps.itertuples())):
if video_path and os.path.exists(video_path):
try:
clip = VideoFileClip(video_path)
# Resize to aspect ratio
clip = self.resize_clip_moviepy(clip, aspect_ratio)
# Set duration to match timestamp
clip = clip.set_duration(row.duration)
clips.append(clip)
except Exception as e:
print(f"Error processing clip {i}: {e}")
continue
if not clips:
raise ValueError("No valid video clips to assemble")
# Concatenate clips
final_video = concatenate_videoclips(clips, method='compose')
# Add audio
if os.path.exists(audio_path):
audio = AudioFileClip(audio_path)
final_video = final_video.set_audio(audio)
# Write output
final_video.write_videofile(
output_path,
codec='libx264',
audio_codec='aac',
fps=24,
preset='medium',
threads=4
)
# Cleanup
for clip in clips:
clip.close()
final_video.close()
return output_path
def assemble_video(
self,
video_paths: List[str],
timestamps: pd.DataFrame,
audio_path: str,
output_path: str,
aspect_ratio: str = '16:9',
add_subtitles: bool = False
) -> str:
"""
Assemble final video (auto-selects MoviePy or ffmpeg)
Args:
video_paths: List of video clip paths
timestamps: DataFrame with word timestamps
audio_path: Path to narration audio
output_path: Output video path
aspect_ratio: Target aspect ratio
add_subtitles: Whether to add subtitles (future implementation)
Returns:
Path to final video
"""
if self.use_moviepy:
try:
return self.assemble_with_moviepy(
video_paths, timestamps, audio_path, output_path, aspect_ratio
)
except Exception as e:
print(f"MoviePy failed: {e}. Falling back to ffmpeg...")
return self.assemble_with_ffmpeg(
video_paths, timestamps, audio_path, output_path, aspect_ratio
)
else:
return self.assemble_with_ffmpeg(
video_paths, timestamps, audio_path, output_path, aspect_ratio
)
# ==================== MAIN PIPELINE ====================
class VideoPipeline:
"""Main pipeline orchestrator"""
def __init__(self):
self.stt = SpeechToText()
self.tts = TextToSpeech()
self.timestamp_gen = TimestampGenerator()
self.timestamp_cleaner = TimestampCleaner()
self.visual_fetcher = VisualFetcher()
self.video_assembler = VideoAssembler()
def process(
self,
# Inputs
text_input: Optional[str] = None,
markdown_file: Optional[str] = None,
audio_file: Optional[str] = None,
json_input: Optional[str] = None,
# Configuration
language: str = 'en',
voice: str = 'en-US-AriaNeural',
speed: float = 1.0,
pitch: int = 0,
aspect_ratio: str = '16:9',
visual_source: str = 'pexels',
add_subtitles: bool = False,
# Output directory
output_dir: str = None
) -> Dict[str, Any]:
"""
Main pipeline execution
Returns:
Dictionary with paths to all outputs and logs
"""
# Create output directory
if output_dir is None:
output_dir = tempfile.mkdtemp(prefix='video_pipeline_')
Path(output_dir).mkdir(parents=True, exist_ok=True)
logs = []
try:
# ===== INPUT PRIORITY LOGIC =====
# 1. JSON override
if json_input:
try:
config = json.loads(json_input)
text_input = config.get('text', text_input)
voice = config.get('voice', voice)
speed = config.get('speed', speed)
pitch = config.get('pitch', pitch)
aspect_ratio = config.get('aspect_ratio', aspect_ratio)
visual_source = config.get('visual_source', visual_source)
add_subtitles = config.get('subtitles', add_subtitles)
language = config.get('language', language)
logs.append("JSON config loaded")
except json.JSONDecodeError as e:
logs.append(f"JSON parse error: {e}")
# 2. Audio transcription
if audio_file and os.path.exists(audio_file):
logs.append("Transcribing audio...")
text_input = self.stt.transcribe(audio_file, language if language != 'auto' else None)
logs.append(f"Transcription: {text_input[:100]}...")
# 3. Markdown extraction
elif markdown_file and os.path.exists(markdown_file):
logs.append("Reading markdown file...")
with open(markdown_file, 'r', encoding='utf-8') as f:
text_input = f.read()
logs.append(f"Markdown text: {text_input[:100]}...")
# 4. Direct text
if not text_input or not text_input.strip():
return {
'status': 'error',
'message': 'No input provided',
'logs': logs
}
text_input = text_input.strip()
# ===== STEP 2: TEXT-TO-SPEECH =====
audio_path = os.path.join(output_dir, 'narration.wav')
logs.append(f"Generating speech with voice: {voice}")
self.tts.synthesize(text_input, voice, speed, pitch, audio_path)
logs.append(f"Audio generated: {audio_path}")
# ===== STEP 3: WORD-LEVEL TIMESTAMPS =====
logs.append("Generating word-level timestamps...")
timestamps_df = self.timestamp_gen.generate_timestamps(audio_path)
raw_csv_path = os.path.join(output_dir, 'timestamps_raw.csv')
timestamps_df.to_csv(raw_csv_path, index=False)
logs.append(f"Raw timestamps: {len(timestamps_df)} words")
# ===== STEP 4: TIMESTAMP CLEANING =====
logs.append("Cleaning timestamps...")
cleaned_df = self.timestamp_cleaner.clean_timestamps(timestamps_df)
logs.append(f"Cleaned: {len(cleaned_df)} meaningful words")
logs.append("Extending timestamp durations...")
extended_df = self.timestamp_cleaner.extend_timestamps(cleaned_df)
cleaned_csv_path = os.path.join(output_dir, 'timestamps_cleaned.csv')
extended_df.to_csv(cleaned_csv_path, index=False)
logs.append(f"Extended timestamps saved: {cleaned_csv_path}")
# ===== STEP 5: VISUAL FETCHING =====
logs.append(f"Fetching visuals from {visual_source}...")
queries = extended_df['word'].tolist()
visuals_dir = os.path.join(output_dir, 'visuals')
video_paths = self.visual_fetcher.fetch_visuals(queries, visuals_dir, visual_source)
valid_count = sum(1 for p in video_paths if p)
logs.append(f"Downloaded {valid_count}/{len(queries)} videos")
# ===== STEP 6: VIDEO ASSEMBLY =====
logs.append("Assembling final video...")
final_video_path = os.path.join(output_dir, 'final_video.mp4')
# Filter out None paths and verify files exist
valid_indices = []
for i, p in enumerate(video_paths):
if p and os.path.exists(p) and os.path.getsize(p) > 1000:
valid_indices.append(i)
else:
if p:
logs.append(f"Skipping invalid video {i}: {p}")
if not valid_indices:
return {
'status': 'error',
'message': f'No valid video clips available. Downloaded {len(video_paths)} files but none are valid. Check API keys and rate limits.',
'audio_path': audio_path,
'timestamp_csv': cleaned_csv_path,
'logs': logs
}
valid_video_paths = [video_paths[i] for i in valid_indices]
valid_timestamps = extended_df.iloc[valid_indices].reset_index(drop=True)
logs.append(f"Valid clips for assembly: {len(valid_video_paths)}")
# List valid video files with sizes
for i, vp in enumerate(valid_video_paths):
size_mb = os.path.getsize(vp) / (1024 * 1024)
logs.append(f" Clip {i}: {size_mb:.2f} MB - {valid_timestamps.iloc[i]['word']}")
self.video_assembler.assemble_video(
valid_video_paths,
valid_timestamps,
audio_path,
final_video_path,
aspect_ratio,
add_subtitles
)
logs.append(f"Video assembly complete: {final_video_path}")
# ===== RETURN RESULTS =====
return {
'status': 'success',
'audio_path': audio_path,
'timestamp_csv_raw': raw_csv_path,
'timestamp_csv_cleaned': cleaned_csv_path,
'video_path': final_video_path,
'output_directory': output_dir,
'logs': logs
}
except Exception as e:
logs.append(f"ERROR: {str(e)}")
import traceback
logs.append(traceback.format_exc())
return {
'status': 'error',
'message': str(e),
'logs': logs
}
# ==================== GRADIO INTERFACE ====================
def create_gradio_interface():
"""Create Gradio UI and API"""
pipeline = VideoPipeline()
def process_wrapper(
text_input,
markdown_file,
audio_file,
json_input,
language,
voice,
speed,
pitch,
aspect_ratio,
visual_source,
add_subtitles
):
"""Wrapper for Gradio interface"""
result = pipeline.process(
text_input=text_input,
markdown_file=markdown_file.name if markdown_file else None,
audio_file=audio_file.name if audio_file else None,
json_input=json_input,
language=language,
voice=voice,
speed=speed,
pitch=pitch,
aspect_ratio=aspect_ratio,
visual_source=visual_source,
add_subtitles=add_subtitles
)
# Format logs
log_text = "\n".join(result.get('logs', []))
# Return outputs
if result['status'] == 'success':
return (
result.get('audio_path'),
result.get('timestamp_csv_cleaned'),
result.get('video_path'),
log_text,
json.dumps(result, indent=2)
)
else:
return (
None,
None,
None,
log_text,
json.dumps(result, indent=2)
)
# ===== GRADIO UI =====
with gr.Blocks(title="Text/Audio → Video Pipeline") as demo:
gr.Markdown("""
# 🎬 Text/Audio → Timestamped Video Pipeline
**Production-ready Gradio Space for automated video generation**
This space converts text or voice into a complete narrated video with:
- Speech synthesis
- Word-level timestamps
- Automated visual fetching
- Final video assembly
**API Available**: Use this space programmatically via Gradio API
""")
with gr.Row():
with gr.Column():
gr.Markdown("### 📥 Input (Choose One or Multiple)")
text_input = gr.Textbox(
label="Text Input",
placeholder="Enter your narration text here...",
lines=5
)
markdown_file = gr.File(
label="Markdown File (.md)",
file_types=['.md']
)
audio_file = gr.File(
label="Audio File (wav/mp3)",
file_types=['.wav', '.mp3', '.m4a']
)
json_input = gr.Textbox(
label="JSON API Input (Advanced)",
placeholder='{"text": "...", "voice": "...", ...}',
lines=3
)
with gr.Column():
gr.Markdown("### ⚙️ Configuration")
language = gr.Dropdown(
choices=['auto', 'en', 'es', 'fr', 'de', 'hi', 'ja', 'zh'],
value='en',
label="Language"
)
voice = gr.Dropdown(
choices=list(TextToSpeech.VOICES.keys()),
value='en-US-AriaNeural',
label="Voice"
)
speed = gr.Slider(
minimum=0.5,
maximum=2.0,
value=1.0,
step=0.1,
label="Speech Speed"
)
pitch = gr.Slider(
minimum=-20,
maximum=20,
value=0,
step=1,
label="Pitch (Hz)"
)
aspect_ratio = gr.Dropdown(
choices=['16:9', '9:16', '1:1', '4:3'],
value='16:9',
label="Aspect Ratio"
)
visual_source = gr.Dropdown(
choices=['pexels', 'tenor'],
value='pexels',
label="Visual Source"
)
add_subtitles = gr.Checkbox(
label="Add Subtitles (Future)",
value=False
)
run_btn = gr.Button("🚀 Run Pipeline", variant="primary", size="lg")
gr.Markdown("### 📤 Outputs")
with gr.Row():
audio_output = gr.Audio(label="Generated Narration")
csv_output = gr.File(label="Cleaned Timestamps (CSV)")
video_output = gr.Video(label="Final Video")
logs_output = gr.Textbox(
label="Execution Logs",
lines=10,
max_lines=20
)
api_output = gr.JSON(label="API Response (JSON)")
# Connect interface
run_btn.click(
fn=process_wrapper,
inputs=[
text_input,
markdown_file,
audio_file,
json_input,
language,
voice,
speed,
pitch,
aspect_ratio,
visual_source,
add_subtitles
],
outputs=[
audio_output,
csv_output,
video_output,
logs_output,
api_output
]
)
gr.Markdown("""
---
### 🔌 API Usage
**Endpoint**: Use Gradio Client to call this space programmatically
```python
from gradio_client import Client
client = Client("YOUR_SPACE_URL")
result = client.predict(
text_input="Your narration text",
markdown_file=None,
audio_file=None,
json_input='{"voice": "en-IN-PrabhatNeural", "speed": 1.2}',
language="en",
voice="en-IN-PrabhatNeural",
speed=1.0,
pitch=0,
aspect_ratio="16:9",
visual_source="pexels",
add_subtitles=False,
api_name="/predict"
)
```
**Google Apps Script Integration**: Use UrlFetchApp to POST to the API endpoint
""")
return demo
# ==================== LAUNCH ====================
if __name__ == "__main__":
demo = create_gradio_interface()
demo.launch(
share=False,
server_name="0.0.0.0",
server_port=7860
)