Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| from pathlib import Path | |
| import torch | |
| from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer | |
| from PIL import Image, ImageDraw, ImageFont | |
| import tempfile | |
| import os | |
| from moviepy.editor import * | |
| import numpy as np | |
| from gtts import gTTS | |
| import textwrap | |
| from concurrent.futures import ThreadPoolExecutor | |
| import io | |
| import unicodedata | |
| import re | |
| import requests | |
| import random | |
| import logging | |
| import time | |
| from typing import Optional, List, Dict, Tuple | |
| from bs4 import BeautifulSoup | |
| import requests | |
| from io import BytesIO | |
| class ImageScraper: | |
| def __init__(self): | |
| self.headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| } | |
| def scrape_pexels(self, query: str) -> List[str]: | |
| urls = [] | |
| try: | |
| url = f"https://www.pexels.com/search/{query.replace(' ', '%20')}/" | |
| response = requests.get(url, headers=self.headers) | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| for img in soup.find_all('img', src=True): | |
| if 'photos' in img['src'] and 'pexels.com' in img['src']: | |
| urls.append(img['src']) | |
| except Exception as e: | |
| print(f"Pexels scraping error: {e}") | |
| return urls | |
| def scrape_unsplash(self, query: str) -> List[str]: | |
| urls = [] | |
| try: | |
| url = f"https://unsplash.com/s/photos/{query.replace(' ', '-')}" | |
| response = requests.get(url, headers=self.headers) | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| for img in soup.find_all('img', src=True): | |
| if 'images.unsplash.com' in img['src']: | |
| urls.append(img['src']) | |
| except Exception as e: | |
| print(f"Unsplash scraping error: {e}") | |
| return urls | |
| def get_images(self, query: str, num_images: int = 15) -> List[str]: | |
| all_urls = [] | |
| all_urls.extend(self.scrape_pexels(query)) | |
| all_urls.extend(self.scrape_unsplash(query)) | |
| # Remove duplicates and limit to num_images | |
| return list(set(all_urls))[:num_images] | |
| class EnhancedVideoGenerator: | |
| def __init__(self): | |
| try: | |
| self.setup_logging() | |
| self.setup_device() | |
| self.initialize_models() | |
| self.setup_workspace() | |
| self.load_assets() | |
| self.setup_themes() | |
| self.image_scraper = ImageScraper() | |
| except Exception as e: | |
| logging.error(f"Initialization failed: {str(e)}") | |
| raise RuntimeError("Failed to initialize video generator") | |
| def setup_logging(self): | |
| """Configure logging for the application""" | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler('video_generator.log'), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| self.logger = logging.getLogger(__name__) | |
| def setup_device(self): | |
| """Set up computing device (CPU/GPU)""" | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| self.logger.info(f"Using device: {self.device}") | |
| def initialize_models(self): | |
| """Initialize all AI models""" | |
| try: | |
| # Text generation model initialization with error handling | |
| try: | |
| self.text_generator = pipeline( | |
| 'text-generation', | |
| model='gpt2', | |
| device=0 if self.device == "cuda" else -1 | |
| ) | |
| except Exception as e: | |
| self.logger.warning(f"Text generator initialization failed: {str(e)}") | |
| self.text_generator = None | |
| # Skip the StableDiffusion model initialization as it requires additional setup | |
| self.image_model = None | |
| # Initialize stability API attribute | |
| self.stability_api = None | |
| except Exception as e: | |
| self.logger.error(f"Model initialization failed: {str(e)}") | |
| # Don't raise exception, allow initialization with degraded functionality | |
| pass | |
| def setup_workspace(self): | |
| """Set up working directory and resources""" | |
| self.temp_dir = Path(tempfile.mkdtemp()) | |
| self.asset_dir = self.temp_dir / "assets" | |
| self.asset_dir.mkdir(exist_ok=True) | |
| def setup_themes(self): | |
| """Set up visual themes""" | |
| self.themes = { | |
| 'Professional': { | |
| 'bg': (240, 240, 240), | |
| 'accent': (0, 120, 212), | |
| 'text': (33, 33, 33) | |
| }, | |
| 'Creative': { | |
| 'bg': (255, 250, 240), | |
| 'accent': (255, 123, 0), | |
| 'text': (51, 51, 51) | |
| }, | |
| 'Educational': { | |
| 'bg': (248, 249, 250), | |
| 'accent': (40, 167, 69), | |
| 'text': (33, 37, 41) | |
| } | |
| } | |
| def load_assets(self): | |
| """Load visual assets and fonts""" | |
| try: | |
| # Try multiple font options | |
| font_options = [ | |
| "arial.ttf", | |
| "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", | |
| "/System/Library/Fonts/Helvetica.ttc" | |
| ] | |
| for font_path in font_options: | |
| try: | |
| self.font = ImageFont.truetype(font_path, 40) | |
| break | |
| except OSError: | |
| continue | |
| else: | |
| self.font = ImageFont.load_default() | |
| self.logger.warning("Using default font - custom font loading failed") | |
| except Exception as e: | |
| self.logger.error(f"Asset loading failed: {str(e)}") | |
| def generate_visual_assets(self, script: str, style: str) -> List[Dict]: | |
| """Generate relevant visual assets based on script content""" | |
| try: | |
| # Extract key topics from script | |
| topics = self.extract_key_topics(script) | |
| assets = [] | |
| for topic in topics: | |
| # Generate AI image | |
| image = self.generate_ai_image(topic, style) | |
| if image: | |
| assets.append({ | |
| 'type': 'image', | |
| 'data': image, | |
| 'topic': topic | |
| }) | |
| return assets | |
| except Exception as e: | |
| self.logger.error(f"Visual asset generation failed: {str(e)}") | |
| return [] | |
| def create_enhanced_frame( | |
| self, | |
| text: str, | |
| theme: dict, | |
| frame_number: int, | |
| total_frames: int, | |
| background_image: Optional[Image.Image] = None, | |
| size: Tuple[int, int] = (1920, 1080) # Upgraded to 1080p | |
| ) -> np.ndarray: | |
| """Create a visually enhanced frame with background, text, and effects""" | |
| try: | |
| # Create base frame | |
| if background_image: | |
| # Resize and crop background to fit | |
| bg = background_image.resize(size, Image.LANCZOS) | |
| frame = np.array(bg) | |
| else: | |
| frame = np.full((size[1], size[0], 3), theme['bg'], dtype=np.uint8) | |
| # Convert to PIL Image for drawing | |
| img = Image.fromarray(frame) | |
| draw = ImageDraw.Draw(img, 'RGBA') | |
| # Add subtle gradient overlay | |
| overlay = Image.new('RGBA', size, (0, 0, 0, 0)) | |
| overlay_draw = ImageDraw.Draw(overlay) | |
| overlay_draw.rectangle( | |
| [0, 0, size[0], size[1]], | |
| fill=(255, 255, 255, 100) # Semi-transparent white | |
| ) | |
| img = Image.alpha_composite(img.convert('RGBA'), overlay) | |
| # Add text with improved styling | |
| text = self.clean_text(text) | |
| wrapped_text = textwrap.fill(text, width=50) | |
| # Calculate text position | |
| text_bbox = draw.textbbox((0, 0), wrapped_text, font=self.font) | |
| text_width = text_bbox[2] - text_bbox[0] | |
| text_height = text_bbox[3] - text_bbox[1] | |
| text_x = (size[0] - text_width) // 2 | |
| text_y = size[1] - text_height - 100 # Position at bottom | |
| # Draw text background | |
| padding = 20 | |
| draw.rectangle( | |
| [ | |
| text_x - padding, | |
| text_y - padding, | |
| text_x + text_width + padding, | |
| text_y + text_height + padding | |
| ], | |
| fill=(0, 0, 0, 160) # Semi-transparent black | |
| ) | |
| # Draw text | |
| draw.text( | |
| (text_x, text_y), | |
| wrapped_text, | |
| fill=(255, 255, 255, 255), | |
| font=self.font | |
| ) | |
| # Add progress bar with animation | |
| self.draw_animated_progress_bar( | |
| draw, | |
| frame_number, | |
| total_frames, | |
| size, | |
| theme | |
| ) | |
| return np.array(img) | |
| except Exception as e: | |
| self.logger.error(f"Frame creation failed: {str(e)}") | |
| # Return fallback frame | |
| return np.full((size[1], size[0], 3), theme['bg'], dtype=np.uint8) | |
| def draw_animated_progress_bar( | |
| self, | |
| draw: ImageDraw.Draw, | |
| frame_number: int, | |
| total_frames: int, | |
| size: Tuple[int, int], | |
| theme: dict | |
| ): | |
| """Draw an animated progress bar with effects""" | |
| try: | |
| progress = frame_number / total_frames | |
| bar_width = int(size[0] * 0.8) # 80% of screen width | |
| bar_height = 6 | |
| x_offset = (size[0] - bar_width) // 2 | |
| y_position = size[1] - 40 | |
| # Draw background bar | |
| draw.rectangle( | |
| [x_offset, y_position, x_offset + bar_width, y_position + bar_height], | |
| fill=(200, 200, 200, 160) | |
| ) | |
| # Draw progress with gradient effect | |
| progress_width = int(bar_width * progress) | |
| for x in range(progress_width): | |
| alpha = int(255 * (x / bar_width)) # Gradient effect | |
| draw.line( | |
| [x_offset + x, y_position, x_offset + x, y_position + bar_height], | |
| fill=(theme['accent'][0], theme['accent'][1], theme['accent'][2], alpha) | |
| ) | |
| # Add animated highlight | |
| highlight_pos = x_offset + progress_width | |
| if highlight_pos < x_offset + bar_width: | |
| draw.rectangle( | |
| [highlight_pos-2, y_position-1, highlight_pos+2, y_position + bar_height+1], | |
| fill=(255, 255, 255, 200) | |
| ) | |
| except Exception as e: | |
| self.logger.error(f"Progress bar drawing failed: {str(e)}") | |
| def generate_voice_over(self, script: str) -> AudioFileClip: | |
| """Generate voice-over audio using gTTS""" | |
| try: | |
| audio_path = self.temp_dir / "voice.mp3" | |
| tts = gTTS( | |
| text=script, | |
| lang='en', | |
| slow=False | |
| ) | |
| tts.save(str(audio_path)) | |
| return AudioFileClip(str(audio_path)) | |
| except Exception as e: | |
| self.logger.error(f"Voice-over generation failed: {str(e)}") | |
| return AudioFileClip(duration=len(script.split()) * 0.3) | |
| def create_video(self, script: str, style: str, duration: int, output_path: str, selected_images: List[str]) -> str: | |
| """Create video with selected images""" | |
| try: | |
| # Progress bar | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| # Generate voice-over (20%) | |
| status_text.text("Creating voice-over...") | |
| audio = self.generate_voice_over(script) | |
| progress_bar.progress(20) | |
| # Process selected images (40%) | |
| status_text.text("Processing images...") | |
| processed_images = [] | |
| for img_url in selected_images: | |
| response = requests.get(img_url) | |
| img = Image.open(BytesIO(response.content)) | |
| img = img.resize((1920, 1080), Image.Resampling.LANCZOS) | |
| processed_images.append(np.array(img)) | |
| progress_bar.progress(40) | |
| # Create frames with transitions | |
| fps = 30 | |
| total_frames = int(duration * fps) | |
| frames = [] | |
| status_text.text("Generating frames...") | |
| frames_per_image = total_frames // len(processed_images) | |
| for idx, img in enumerate(processed_images): | |
| for _ in range(frames_per_image): | |
| frames.append(img) | |
| # Add transition frames | |
| if idx < len(processed_images) - 1: | |
| next_img = processed_images[idx + 1] | |
| for alpha in np.linspace(0, 1, 15): | |
| transition_frame = (1 - alpha) * img + alpha * next_img | |
| frames.append(transition_frame.astype(np.uint8)) | |
| progress_bar.progress(70) | |
| # Create video clip | |
| status_text.text("Compiling video...") | |
| video = ImageSequenceClip(frames, fps=fps) | |
| video = video.set_audio(audio) | |
| progress_bar.progress(90) | |
| # Write final video | |
| status_text.text("Saving video...") | |
| video.write_videofile( | |
| output_path, | |
| fps=fps, | |
| codec='libx264', | |
| audio_codec='aac', | |
| threads=4, | |
| preset='ultrafast' | |
| ) | |
| progress_bar.progress(100) | |
| status_text.text("Video generation complete!") | |
| return output_path | |
| except Exception as e: | |
| self.logger.error(f"Video creation failed: {str(e)}") | |
| raise | |
| def generate_visual_assets(self, script: str, style: str) -> List[Dict]: | |
| """Generate relevant visual assets based on script content""" | |
| try: | |
| # Simplified asset generation for faster processing | |
| topics = self.extract_key_topics(script)[:3] # Limit to 3 topics | |
| assets = [] | |
| for topic in topics: | |
| # Create simple colored backgrounds instead of AI images | |
| img = Image.new('RGB', (1920, 1080), self.themes[style]['bg']) | |
| assets.append({ | |
| 'type': 'image', | |
| 'data': img, | |
| 'topic': topic | |
| }) | |
| return assets | |
| except Exception as e: | |
| self.logger.error(f"Visual asset generation failed: {str(e)}") | |
| return [] | |
| def clean_text(text: str) -> str: | |
| """Clean and normalize text for display""" | |
| if not isinstance(text, str): | |
| text = str(text) | |
| # Normalize unicode characters | |
| text = unicodedata.normalize('NFKD', text) | |
| # Remove non-ASCII characters | |
| text = text.encode('ascii', 'ignore').decode('ascii') | |
| # Replace problematic characters | |
| replacements = { | |
| '–': '-', # en dash | |
| '—': '-', # em dash | |
| '"': '"', # smart quotes | |
| '"': '"', # smart quotes | |
| ''': "'", # smart apostrophe | |
| ''': "'", # smart apostrophe | |
| '…': '...', # ellipsis | |
| } | |
| for old, new in replacements.items(): | |
| text = text.replace(old, new) | |
| # Remove any remaining non-standard characters | |
| text = re.sub(r'[^\x00-\x7F]+', '', text) | |
| return text.strip() | |
| def extract_key_topics(self, script: str) -> List[str]: | |
| """Extract main topics from the script for visual asset generation""" | |
| try: | |
| # Simple keyword extraction based on noun phrases | |
| # In a production environment, you might want to use a proper NLP library | |
| sentences = script.split('.') | |
| topics = [] | |
| for sentence in sentences: | |
| words = sentence.strip().split() | |
| if len(words) >= 2: | |
| # Extract potential noun phrases (pairs of words) | |
| topics.append(' '.join(words[:2])) | |
| # Remove duplicates and limit to top 5 topics | |
| return list(dict.fromkeys(topics))[:5] | |
| except Exception as e: | |
| self.logger.error(f"Topic extraction failed: {str(e)}") | |
| return ["default topic"] | |
| def generate_ai_image(self, prompt: str, style: str) -> Optional[Image.Image]: | |
| """Generate an AI image using Stability AI""" | |
| try: | |
| if not self.stability_api: | |
| return None | |
| # Enhance prompt based on style | |
| style_prompts = { | |
| 'Professional': "professional, corporate, clean, modern", | |
| 'Creative': "artistic, vibrant, innovative, dynamic", | |
| 'Educational': "clear, informative, academic, detailed" | |
| } | |
| enhanced_prompt = f"{prompt}, {style_prompts.get(style, '')}, high quality, 4k" | |
| # Generate image | |
| response = self.stability_api.generate( | |
| prompt=enhanced_prompt, | |
| samples=1, | |
| width=1920, | |
| height=1080 | |
| ) | |
| if response and len(response) > 0: | |
| image_data = response[0].image | |
| return Image.open(io.BytesIO(image_data)) | |
| return None | |
| except Exception as e: | |
| self.logger.error(f"AI image generation failed: {str(e)}") | |
| return None | |
| def cleanup(self): | |
| """Clean up temporary files and resources""" | |
| try: | |
| for file in self.temp_dir.glob('*'): | |
| try: | |
| if file.is_file(): | |
| file.unlink() | |
| elif file.is_dir(): | |
| import shutil | |
| shutil.rmtree(file) | |
| except Exception as e: | |
| self.logger.warning(f"Failed to delete {file}: {str(e)}") | |
| self.temp_dir.rmdir() | |
| except Exception as e: | |
| self.logger.error(f"Cleanup failed: {str(e)}") | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| self.cleanup() | |
| # Streamlit UI Class | |
| class VideoGeneratorUI: | |
| def __init__(self): | |
| self.generator = EnhancedVideoGenerator() | |
| self.setup_ui() | |
| def setup_ui(self): | |
| st.title("Enhanced Video Generator") | |
| # Step 1: Input prompt | |
| prompt = st.text_input("Enter your video topic/prompt") | |
| if prompt: | |
| # Step 2: Image Selection | |
| st.subheader("Select Images for Your Video") | |
| images = self.generator.image_scraper.get_images(prompt) | |
| if not images: | |
| st.warning("No images found. Try a different search term.") | |
| return | |
| selected_images = [] | |
| cols = st.columns(3) | |
| for idx, img_url in enumerate(images): | |
| with cols[idx % 3]: | |
| try: | |
| response = requests.get(img_url) | |
| img = Image.open(BytesIO(response.content)) | |
| st.image(img, use_column_width=True) | |
| if st.checkbox(f"Select Image {idx + 1}", key=f"img_{idx}"): | |
| selected_images.append(img_url) | |
| except: | |
| continue | |
| # Step 3: Video Generation (only show if images are selected) | |
| if selected_images: | |
| st.subheader("Video Generation Settings") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| style = st.selectbox("Choose style", options=list(self.generator.themes.keys())) | |
| with col2: | |
| duration = st.slider("Video duration (seconds)", 10, 300, 60, 10) | |
| if st.button("Generate Video"): | |
| try: | |
| output_path = os.path.join(os.getcwd(), f"generated_video_{int(time.time())}.mp4") | |
| video_path = self.generator.create_video(prompt, style, duration, output_path, selected_images) | |
| if os.path.exists(video_path): | |
| st.success("Video generated successfully!") | |
| st.video(video_path) | |
| with open(video_path, 'rb') as video_file: | |
| st.download_button( | |
| "Download Video", | |
| video_file.read(), | |
| file_name=os.path.basename(video_path), | |
| mime="video/mp4" | |
| ) | |
| except Exception as e: | |
| st.error(f"Failed to generate video: {str(e)}") | |
| if __name__ == "__main__": | |
| ui = VideoGeneratorUI() |