Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| from pathlib import Path | |
| import torch | |
| from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer | |
| from PIL import Image, ImageDraw, ImageFont | |
| import tempfile | |
| import os | |
| from moviepy.editor import * | |
| import numpy as np | |
| from gtts import gTTS | |
| import textwrap | |
| from concurrent.futures import ThreadPoolExecutor | |
| import io | |
| import unicodedata | |
| import re | |
| import requests | |
| import random | |
| import logging | |
| from typing import Optional, List, Dict, Tuple | |
| class EnhancedVideoGenerator: | |
| def __init__(self): | |
| """Initialize the video generator with all required components""" | |
| try: | |
| self.setup_logging() | |
| self.setup_device() | |
| self.initialize_models() | |
| self.setup_workspace() | |
| self.load_assets() | |
| self.setup_themes() | |
| except Exception as e: | |
| logging.error(f"Initialization failed: {str(e)}") | |
| raise RuntimeError("Failed to initialize video generator") | |
| def setup_logging(self): | |
| """Configure logging for the application""" | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler('video_generator.log'), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| self.logger = logging.getLogger(__name__) | |
| def setup_device(self): | |
| """Set up computing device (CPU/GPU)""" | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| self.logger.info(f"Using device: {self.device}") | |
| def initialize_models(self): | |
| """Initialize all AI models""" | |
| try: | |
| # Text generation model | |
| self.text_generator = pipeline( | |
| 'text-generation', | |
| model='gpt2', | |
| device=0 if self.device == "cuda" else -1 | |
| ) | |
| # Initialize free image generation model | |
| self.image_model = AutoModelForCausalLM.from_pretrained( | |
| "CompVis/stable-diffusion-v1-4", | |
| torch_dtype=torch.float16 if self.device == "cuda" else torch.float32 | |
| ).to(self.device) | |
| except Exception as e: | |
| self.logger.error(f"Model initialization failed: {str(e)}") | |
| raise | |
| def setup_workspace(self): | |
| """Set up working directory and resources""" | |
| self.temp_dir = Path(tempfile.mkdtemp()) | |
| self.asset_dir = self.temp_dir / "assets" | |
| self.asset_dir.mkdir(exist_ok=True) | |
| def setup_themes(self): | |
| """Set up visual themes""" | |
| self.themes = { | |
| 'Professional': { | |
| 'bg': (240, 240, 240), | |
| 'accent': (0, 120, 212), | |
| 'text': (33, 33, 33) | |
| }, | |
| 'Creative': { | |
| 'bg': (255, 250, 240), | |
| 'accent': (255, 123, 0), | |
| 'text': (51, 51, 51) | |
| }, | |
| 'Educational': { | |
| 'bg': (248, 249, 250), | |
| 'accent': (40, 167, 69), | |
| 'text': (33, 37, 41) | |
| } | |
| } | |
| def load_assets(self): | |
| """Load visual assets and fonts""" | |
| try: | |
| # Try multiple font options | |
| font_options = [ | |
| "arial.ttf", | |
| "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", | |
| "/System/Library/Fonts/Helvetica.ttc" | |
| ] | |
| for font_path in font_options: | |
| try: | |
| self.font = ImageFont.truetype(font_path, 40) | |
| break | |
| except OSError: | |
| continue | |
| else: | |
| self.font = ImageFont.load_default() | |
| self.logger.warning("Using default font - custom font loading failed") | |
| except Exception as e: | |
| self.logger.error(f"Asset loading failed: {str(e)}") | |
| def generate_visual_assets(self, script: str, style: str) -> List[Dict]: | |
| """Generate relevant visual assets based on script content""" | |
| try: | |
| # Extract key topics from script | |
| topics = self.extract_key_topics(script) | |
| assets = [] | |
| for topic in topics: | |
| # Generate AI image | |
| image = self.generate_ai_image(topic, style) | |
| if image: | |
| assets.append({ | |
| 'type': 'image', | |
| 'data': image, | |
| 'topic': topic | |
| }) | |
| return assets | |
| except Exception as e: | |
| self.logger.error(f"Visual asset generation failed: {str(e)}") | |
| return [] | |
| def create_enhanced_frame( | |
| self, | |
| text: str, | |
| theme: dict, | |
| frame_number: int, | |
| total_frames: int, | |
| background_image: Optional[Image.Image] = None, | |
| size: Tuple[int, int] = (1920, 1080) # Upgraded to 1080p | |
| ) -> np.ndarray: | |
| """Create a visually enhanced frame with background, text, and effects""" | |
| try: | |
| # Create base frame | |
| if background_image: | |
| # Resize and crop background to fit | |
| bg = background_image.resize(size, Image.LANCZOS) | |
| frame = np.array(bg) | |
| else: | |
| frame = np.full((size[1], size[0], 3), theme['bg'], dtype=np.uint8) | |
| # Convert to PIL Image for drawing | |
| img = Image.fromarray(frame) | |
| draw = ImageDraw.Draw(img, 'RGBA') | |
| # Add subtle gradient overlay | |
| overlay = Image.new('RGBA', size, (0, 0, 0, 0)) | |
| overlay_draw = ImageDraw.Draw(overlay) | |
| overlay_draw.rectangle( | |
| [0, 0, size[0], size[1]], | |
| fill=(255, 255, 255, 100) # Semi-transparent white | |
| ) | |
| img = Image.alpha_composite(img.convert('RGBA'), overlay) | |
| # Add text with improved styling | |
| text = self.clean_text(text) | |
| wrapped_text = textwrap.fill(text, width=50) | |
| # Calculate text position | |
| text_bbox = draw.textbbox((0, 0), wrapped_text, font=self.font) | |
| text_width = text_bbox[2] - text_bbox[0] | |
| text_height = text_bbox[3] - text_bbox[1] | |
| text_x = (size[0] - text_width) // 2 | |
| text_y = size[1] - text_height - 100 # Position at bottom | |
| # Draw text background | |
| padding = 20 | |
| draw.rectangle( | |
| [ | |
| text_x - padding, | |
| text_y - padding, | |
| text_x + text_width + padding, | |
| text_y + text_height + padding | |
| ], | |
| fill=(0, 0, 0, 160) # Semi-transparent black | |
| ) | |
| # Draw text | |
| draw.text( | |
| (text_x, text_y), | |
| wrapped_text, | |
| fill=(255, 255, 255, 255), | |
| font=self.font | |
| ) | |
| # Add progress bar with animation | |
| self.draw_animated_progress_bar( | |
| draw, | |
| frame_number, | |
| total_frames, | |
| size, | |
| theme | |
| ) | |
| return np.array(img) | |
| except Exception as e: | |
| self.logger.error(f"Frame creation failed: {str(e)}") | |
| # Return fallback frame | |
| return np.full((size[1], size[0], 3), theme['bg'], dtype=np.uint8) | |
| def draw_animated_progress_bar( | |
| self, | |
| draw: ImageDraw.Draw, | |
| frame_number: int, | |
| total_frames: int, | |
| size: Tuple[int, int], | |
| theme: dict | |
| ): | |
| """Draw an animated progress bar with effects""" | |
| try: | |
| progress = frame_number / total_frames | |
| bar_width = int(size[0] * 0.8) # 80% of screen width | |
| bar_height = 6 | |
| x_offset = (size[0] - bar_width) // 2 | |
| y_position = size[1] - 40 | |
| # Draw background bar | |
| draw.rectangle( | |
| [x_offset, y_position, x_offset + bar_width, y_position + bar_height], | |
| fill=(200, 200, 200, 160) | |
| ) | |
| # Draw progress with gradient effect | |
| progress_width = int(bar_width * progress) | |
| for x in range(progress_width): | |
| alpha = int(255 * (x / bar_width)) # Gradient effect | |
| draw.line( | |
| [x_offset + x, y_position, x_offset + x, y_position + bar_height], | |
| fill=(theme['accent'][0], theme['accent'][1], theme['accent'][2], alpha) | |
| ) | |
| # Add animated highlight | |
| highlight_pos = x_offset + progress_width | |
| if highlight_pos < x_offset + bar_width: | |
| draw.rectangle( | |
| [highlight_pos-2, y_position-1, highlight_pos+2, y_position + bar_height+1], | |
| fill=(255, 255, 255, 200) | |
| ) | |
| except Exception as e: | |
| self.logger.error(f"Progress bar drawing failed: {str(e)}") | |
| def generate_voice_over(self, script: str) -> AudioFileClip: | |
| """Generate voice-over audio using gTTS""" | |
| try: | |
| audio_path = self.temp_dir / "voice.mp3" | |
| tts = gTTS( | |
| text=script, | |
| lang='en', | |
| slow=False | |
| ) | |
| tts.save(str(audio_path)) | |
| return AudioFileClip(str(audio_path)) | |
| except Exception as e: | |
| self.logger.error(f"Voice-over generation failed: {str(e)}") | |
| return AudioFileClip(duration=len(script.split()) * 0.3) | |
| def create_video( | |
| self, | |
| script: str, | |
| style: str, | |
| duration: int, | |
| output_path: str | |
| ) -> str: | |
| """Create full video with all enhanced features""" | |
| try: | |
| # Generate visual assets | |
| assets = self.generate_visual_assets(script, style) | |
| # Generate voice-over | |
| audio = self.generate_voice_over(script) | |
| # Create frames with visual assets | |
| frames = [] | |
| fps = 30 | |
| total_frames = int(duration * fps) | |
| with ThreadPoolExecutor() as executor: | |
| frame_futures = [] | |
| for i in range(total_frames): | |
| # Calculate current text segment | |
| progress = i / total_frames | |
| text_index = int(progress * len(script.split())) | |
| current_text = " ".join(script.split()[:text_index + 1]) | |
| # Get appropriate background | |
| asset_index = int(progress * len(assets)) | |
| current_asset = assets[asset_index] if assets else None | |
| # Submit frame creation to thread pool | |
| future = executor.submit( | |
| self.create_enhanced_frame, | |
| current_text, | |
| self.themes[style], | |
| i, | |
| total_frames, | |
| current_asset['data'] if current_asset and current_asset['type'] == 'image' else None | |
| ) | |
| frame_futures.append(future) | |
| # Collect frames | |
| frames = [future.result() for future in frame_futures] | |
| # Create video clip | |
| video = ImageSequenceClip(frames, fps=fps) | |
| # Add voice-over | |
| video = video.set_audio(audio) | |
| # Add background music (if available) | |
| try: | |
| music = AudioFileClip("assets/music/background.mp3") | |
| music = music.volumex(0.1).loop(duration=video.duration) | |
| video = video.set_audio(CompositeAudioClip([video.audio, music])) | |
| except Exception as e: | |
| self.logger.warning(f"Background music addition failed: {str(e)}") | |
| # Write final video | |
| video.write_videofile( | |
| output_path, | |
| fps=fps, | |
| codec='libx264', | |
| audio_codec='aac', | |
| threads=4, | |
| preset='medium' | |
| ) | |
| return output_path | |
| except Exception as e: | |
| self.logger.error(f"Video creation failed: {str(e)}") | |
| raise | |
| def clean_text(text: str) -> str: | |
| """Clean and normalize text for display""" | |
| if not isinstance(text, str): | |
| text = str(text) | |
| # Normalize unicode characters | |
| text = unicodedata.normalize('NFKD', text) | |
| # Remove non-ASCII characters | |
| text = text.encode('ascii', 'ignore').decode('ascii') | |
| # Replace problematic characters | |
| replacements = { | |
| '–': '-', # en dash | |
| '—': '-', # em dash | |
| '"': '"', # smart quotes | |
| '"': '"', # smart quotes | |
| ''': "'", # smart apostrophe | |
| ''': "'", # smart apostrophe | |
| '…': '...', # ellipsis | |
| } | |
| for old, new in replacements.items(): | |
| text = text.replace(old, new) | |
| # Remove any remaining non-standard characters | |
| text = re.sub(r'[^\x00-\x7F]+', '', text) | |
| return text.strip() | |
| def extract_key_topics(self, script: str) -> List[str]: | |
| """Extract main topics from the script for visual asset generation""" | |
| try: | |
| # Simple keyword extraction based on noun phrases | |
| # In a production environment, you might want to use a proper NLP library | |
| sentences = script.split('.') | |
| topics = [] | |
| for sentence in sentences: | |
| words = sentence.strip().split() | |
| if len(words) >= 2: | |
| # Extract potential noun phrases (pairs of words) | |
| topics.append(' '.join(words[:2])) | |
| # Remove duplicates and limit to top 5 topics | |
| return list(dict.fromkeys(topics))[:5] | |
| except Exception as e: | |
| self.logger.error(f"Topic extraction failed: {str(e)}") | |
| return ["default topic"] | |
| def generate_ai_image(self, prompt: str, style: str) -> Optional[Image.Image]: | |
| """Generate an AI image using Stability AI""" | |
| try: | |
| if not self.stability_api: | |
| return None | |
| # Enhance prompt based on style | |
| style_prompts = { | |
| 'Professional': "professional, corporate, clean, modern", | |
| 'Creative': "artistic, vibrant, innovative, dynamic", | |
| 'Educational': "clear, informative, academic, detailed" | |
| } | |
| enhanced_prompt = f"{prompt}, {style_prompts.get(style, '')}, high quality, 4k" | |
| # Generate image | |
| response = self.stability_api.generate( | |
| prompt=enhanced_prompt, | |
| samples=1, | |
| width=1920, | |
| height=1080 | |
| ) | |
| if response and len(response) > 0: | |
| image_data = response[0].image | |
| return Image.open(io.BytesIO(image_data)) | |
| return None | |
| except Exception as e: | |
| self.logger.error(f"AI image generation failed: {str(e)}") | |
| return None | |
| def cleanup(self): | |
| """Clean up temporary files and resources""" | |
| try: | |
| for file in self.temp_dir.glob('*'): | |
| try: | |
| if file.is_file(): | |
| file.unlink() | |
| elif file.is_dir(): | |
| import shutil | |
| shutil.rmtree(file) | |
| except Exception as e: | |
| self.logger.warning(f"Failed to delete {file}: {str(e)}") | |
| self.temp_dir.rmdir() | |
| except Exception as e: | |
| self.logger.error(f"Cleanup failed: {str(e)}") | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| self.cleanup() | |
| # Streamlit UI Class | |
| class VideoGeneratorUI: | |
| def __init__(self): | |
| self.generator = EnhancedVideoGenerator() | |
| self.setup_ui() | |
| def setup_ui(self): | |
| st.title("Enhanced Video Generator") | |
| st.write("Create professional videos with AI-generated content") | |
| with st.form("video_generator_form"): | |
| # Input fields | |
| prompt = st.text_area( | |
| "Enter your video topic/prompt", | |
| height=100, | |
| help="Describe what you want your video to be about" | |
| ) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| style = st.selectbox( | |
| "Choose style", | |
| options=list(self.generator.themes.keys()) | |
| ) | |
| with col2: | |
| duration = st.slider( | |
| "Video duration (seconds)", | |
| min_value=10, | |
| max_value=300, | |
| value=60, | |
| step=10 | |
| ) | |
| advanced_options = st.expander("Advanced Options") | |
| with advanced_options: | |
| use_premium_voice = st.checkbox( | |
| "Use premium voice-over", | |
| value=False, | |
| help="Requires ElevenLabs API key" | |
| ) | |
| include_music = st.checkbox( | |
| "Include background music", | |
| value=True | |
| ) | |
| fps = st.slider( | |
| "Frames per second", | |
| min_value=24, | |
| max_value=60, | |
| value=30 | |
| ) | |
| submit_button = st.form_submit_button("Generate Video") | |
| if submit_button: | |
| if not prompt: | |
| st.error("Please enter a prompt for your video.") | |
| return | |
| try: | |
| with st.spinner("Generating your video..."): | |
| output_path = f"generated_video_{int(time.time())}.mp4" | |
| # Update generator settings based on advanced options | |
| self.generator.use_premium_voice = use_premium_voice | |
| # Generate video | |
| video_path = self.generator.create_video( | |
| prompt, | |
| style, | |
| duration, | |
| output_path | |
| ) | |
| # Show success message and download button | |
| st.success("Video generated successfully!") | |
| with open(video_path, 'rb') as f: | |
| st.download_button( | |
| label="Download Video", | |
| data=f.read(), | |
| file_name=output_path, | |
| mime="video/mp4" | |
| ) | |
| except Exception as e: | |
| st.error(f"Failed to generate video: {str(e)}") | |
| st.error("Please try again with different settings or contact support.") | |
| if __name__ == "__main__": | |
| ui = VideoGeneratorUI() |