Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| from pathlib import Path | |
| import torch | |
| from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer | |
| from PIL import Image, ImageDraw, ImageFont | |
| import tempfile | |
| import os | |
| from moviepy.editor import * | |
| import numpy as np | |
| from gtts import gTTS | |
| import textwrap | |
| from concurrent.futures import ThreadPoolExecutor | |
| import io | |
| import unicodedata | |
| import re | |
| import requests | |
| import random | |
| import logging | |
| import time | |
| from typing import Optional, List, Dict, Tuple | |
| from bs4 import BeautifulSoup | |
| import requests | |
| from io import BytesIO | |
| class ImageScraper: | |
| def __init__(self): | |
| self.PIXABAY_API_KEY = "48069976-37e20099248207cee12385560" # Your API key | |
| self.headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| } | |
| self.temp_dir = Path(tempfile.mkdtemp()) | |
| def get_pixabay_images(self, query: str) -> List[str]: | |
| """Get images from Pixabay API with enhanced error handling""" | |
| try: | |
| # Clean and encode the query | |
| clean_query = query.replace(' ', '+').strip() | |
| base_url = "https://pixabay.com/api/" | |
| params = { | |
| 'key': self.PIXABAY_API_KEY, | |
| 'q': clean_query, | |
| 'image_type': 'photo', | |
| 'per_page': 20, | |
| 'safesearch': True, | |
| 'lang': 'en' | |
| } | |
| response = requests.get(base_url, params=params, headers=self.headers) | |
| # Debug logging | |
| print(f"Pixabay API URL: {response.url}") | |
| print(f"Response status: {response.status_code}") | |
| if response.status_code == 200: | |
| data = response.json() | |
| print(f"Total hits: {data.get('totalHits', 0)}") | |
| if 'hits' in data and data['hits']: | |
| image_urls = [img['largeImageURL'] for img in data['hits']] | |
| print(f"Found {len(image_urls)} images") | |
| return image_urls | |
| else: | |
| print("No images found in response") | |
| return self.get_stock_images() | |
| else: | |
| print(f"Pixabay API error: Status code {response.status_code}") | |
| return self.get_stock_images() | |
| except Exception as e: | |
| print(f"Exception in get_pixabay_images: {str(e)}") | |
| return self.get_stock_images() | |
| def get_stock_images(self) -> List[str]: | |
| """Return preset stock images as fallback""" | |
| return [ | |
| "https://images.pexels.com/photos/60504/security-protection-anti-virus-software-60504.jpeg", | |
| "https://images.pexels.com/photos/5380642/pexels-photo-5380642.jpeg", | |
| "https://images.pexels.com/photos/2582937/pexels-photo-2582937.jpeg", | |
| "https://images.pexels.com/photos/7319074/pexels-photo-7319074.jpeg", | |
| "https://images.pexels.com/photos/4164418/pexels-photo-4164418.jpeg", | |
| "https://images.pexels.com/photos/3861969/pexels-photo-3861969.jpeg", | |
| "https://images.pexels.com/photos/5473298/pexels-photo-5473298.jpeg", | |
| "https://images.pexels.com/photos/4348401/pexels-photo-4348401.jpeg", | |
| "https://images.pexels.com/photos/8386440/pexels-photo-8386440.jpeg", | |
| "https://images.pexels.com/photos/5473950/pexels-photo-5473950.jpeg" | |
| ] | |
| def get_images(self, query: str, num_images: int = 15) -> List[str]: | |
| """Main method to get images with fallback options""" | |
| # First try Pixabay | |
| images = self.get_pixabay_images(query) | |
| # If no Pixabay images, try with technology-related terms | |
| if not images: | |
| print("No images found for original query, trying tech terms...") | |
| tech_terms = ["digital security", "technology", "cyber security", "data protection"] | |
| for term in tech_terms: | |
| images.extend(self.get_pixabay_images(term)) | |
| if images: | |
| break | |
| # If still no images, use stock images | |
| if not images: | |
| print("Using fallback stock images...") | |
| images = self.get_stock_images() | |
| # Remove duplicates and limit to num_images | |
| unique_images = list(dict.fromkeys(images))[:num_images] | |
| print(f"Returning {len(unique_images)} images") | |
| return unique_images | |
| def verify_image_url(self, url: str) -> bool: | |
| """Verify if an image URL is accessible""" | |
| try: | |
| response = requests.head(url, timeout=5) | |
| return response.status_code == 200 | |
| except: | |
| return False | |
| def generate_fallback_audio(self, script: str) -> AudioFileClip: | |
| """Generate fallback audio using gTTS""" | |
| try: | |
| audio_path = self.temp_dir / "voice.mp3" | |
| tts = gTTS(text=script, lang='en', slow=False) | |
| tts.save(str(audio_path)) | |
| return AudioFileClip(str(audio_path)) | |
| except Exception as e: | |
| print(f"Fallback audio generation failed: {e}") | |
| duration = len(script.split()) * 0.3 | |
| return AudioFileClip(duration=duration) | |
| def get_stock_images(self) -> List[str]: | |
| """Return preset stock images as fallback""" | |
| return [ | |
| "https://images.pexels.com/photos/60504/security-protection-anti-virus-software-60504.jpeg", | |
| "https://images.pexels.com/photos/5380642/pexels-photo-5380642.jpeg", | |
| "https://images.pexels.com/photos/2582937/pexels-photo-2582937.jpeg", | |
| "https://images.pexels.com/photos/7319074/pexels-photo-7319074.jpeg", | |
| "https://images.pexels.com/photos/4164418/pexels-photo-4164418.jpeg", | |
| "https://images.pexels.com/photos/3861969/pexels-photo-3861969.jpeg", | |
| "https://images.pexels.com/photos/5473298/pexels-photo-5473298.jpeg", | |
| "https://images.pexels.com/photos/4348401/pexels-photo-4348401.jpeg", | |
| "https://images.pexels.com/photos/8386440/pexels-photo-8386440.jpeg", | |
| "https://images.pexels.com/photos/5473950/pexels-photo-5473950.jpeg" | |
| ] | |
| def get_images(self, query: str, num_images: int = 15) -> List[str]: | |
| # First try Pixabay | |
| images = self.get_pixabay_images(query) | |
| # If no Pixabay images, try with technology-related terms | |
| if not images: | |
| tech_terms = ["digital security", "technology", "cyber security", "data protection"] | |
| for term in tech_terms: | |
| images.extend(self.get_pixabay_images(term)) | |
| # If still no images, use stock images | |
| if not images: | |
| images = self.get_stock_images() | |
| # Remove duplicates and limit to num_images | |
| return list(dict.fromkeys(images))[:num_images] | |
| def scrape_pexels(self, query: str) -> List[str]: | |
| urls = [] | |
| try: | |
| url = f"https://www.pexels.com/search/{query.replace(' ', '%20')}/" | |
| response = requests.get(url, headers=self.headers) | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # Updated selector to target image sources | |
| for img in soup.find_all('img', {'data-image-width': True}): | |
| if img.get('src') and 'photos' in img['src']: | |
| urls.append(img['src']) | |
| except Exception as e: | |
| print(f"Pexels scraping error: {e}") | |
| return urls | |
| def scrape_unsplash(self, query: str) -> List[str]: | |
| urls = [] | |
| try: | |
| url = f"https://unsplash.com/s/photos/{query.replace(' ', '-')}" | |
| response = requests.get(url, headers=self.headers) | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # Updated selector for Unsplash | |
| for img in soup.find_all('img', {'srcset': True}): | |
| src = img.get('src') | |
| if src and 'images.unsplash.com' in src: | |
| urls.append(src) | |
| except Exception as e: | |
| print(f"Unsplash scraping error: {e}") | |
| return urls | |
| class EnhancedVideoGenerator: | |
| def __init__(self): | |
| try: | |
| self.setup_logging() | |
| self.setup_device() | |
| self.initialize_models() | |
| self.setup_workspace() | |
| self.load_assets() | |
| self.setup_themes() | |
| self.image_scraper = ImageScraper() | |
| except Exception as e: | |
| logging.error(f"Initialization failed: {str(e)}") | |
| raise RuntimeError("Failed to initialize video generator") | |
| self.ELEVEN_LABS_API_KEY = "sk_acdad9d2d82d504bddbe5ed4aa290ca772c106aed5b128ba" # Replace with your key | |
| def setup_logging(self): | |
| """Configure logging for the application""" | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler('video_generator.log'), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| self.logger = logging.getLogger(__name__) | |
| def setup_device(self): | |
| """Set up computing device (CPU/GPU)""" | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| self.logger.info(f"Using device: {self.device}") | |
| def initialize_models(self): | |
| """Initialize all AI models""" | |
| try: | |
| # Text generation model initialization with error handling | |
| try: | |
| self.text_generator = pipeline( | |
| 'text-generation', | |
| model='gpt2', | |
| device=0 if self.device == "cuda" else -1 | |
| ) | |
| except Exception as e: | |
| self.logger.warning(f"Text generator initialization failed: {str(e)}") | |
| self.text_generator = None | |
| # Skip the StableDiffusion model initialization as it requires additional setup | |
| self.image_model = None | |
| # Initialize stability API attribute | |
| self.stability_api = None | |
| except Exception as e: | |
| self.logger.error(f"Model initialization failed: {str(e)}") | |
| # Don't raise exception, allow initialization with degraded functionality | |
| pass | |
| def setup_workspace(self): | |
| """Set up working directory and resources""" | |
| self.temp_dir = Path(tempfile.mkdtemp()) | |
| self.asset_dir = self.temp_dir / "assets" | |
| self.asset_dir.mkdir(exist_ok=True) | |
| def setup_themes(self): | |
| """Set up visual themes""" | |
| self.themes = { | |
| 'Professional': { | |
| 'bg': (240, 240, 240), | |
| 'accent': (0, 120, 212), | |
| 'text': (33, 33, 33) | |
| }, | |
| 'Creative': { | |
| 'bg': (255, 250, 240), | |
| 'accent': (255, 123, 0), | |
| 'text': (51, 51, 51) | |
| }, | |
| 'Educational': { | |
| 'bg': (248, 249, 250), | |
| 'accent': (40, 167, 69), | |
| 'text': (33, 37, 41) | |
| } | |
| } | |
| def load_assets(self): | |
| """Load visual assets and fonts""" | |
| try: | |
| # Try multiple font options | |
| font_options = [ | |
| "arial.ttf", | |
| "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", | |
| "/System/Library/Fonts/Helvetica.ttc" | |
| ] | |
| for font_path in font_options: | |
| try: | |
| self.font = ImageFont.truetype(font_path, 40) | |
| break | |
| except OSError: | |
| continue | |
| else: | |
| self.font = ImageFont.load_default() | |
| self.logger.warning("Using default font - custom font loading failed") | |
| except Exception as e: | |
| self.logger.error(f"Asset loading failed: {str(e)}") | |
| def generate_visual_assets(self, script: str, style: str) -> List[Dict]: | |
| """Generate relevant visual assets based on script content""" | |
| try: | |
| # Extract key topics from script | |
| topics = self.extract_key_topics(script) | |
| assets = [] | |
| for topic in topics: | |
| # Generate AI image | |
| image = self.generate_ai_image(topic, style) | |
| if image: | |
| assets.append({ | |
| 'type': 'image', | |
| 'data': image, | |
| 'topic': topic | |
| }) | |
| return assets | |
| except Exception as e: | |
| self.logger.error(f"Visual asset generation failed: {str(e)}") | |
| return [] | |
| def create_enhanced_frame( | |
| self, | |
| text: str, | |
| theme: dict, | |
| frame_number: int, | |
| total_frames: int, | |
| background_image: Optional[Image.Image] = None, | |
| size: Tuple[int, int] = (1920, 1080) # Upgraded to 1080p | |
| ) -> np.ndarray: | |
| """Create a visually enhanced frame with background, text, and effects""" | |
| try: | |
| # Create base frame | |
| if background_image: | |
| # Resize and crop background to fit | |
| bg = background_image.resize(size, Image.LANCZOS) | |
| frame = np.array(bg) | |
| else: | |
| frame = np.full((size[1], size[0], 3), theme['bg'], dtype=np.uint8) | |
| # Convert to PIL Image for drawing | |
| img = Image.fromarray(frame) | |
| draw = ImageDraw.Draw(img, 'RGBA') | |
| # Add subtle gradient overlay | |
| overlay = Image.new('RGBA', size, (0, 0, 0, 0)) | |
| overlay_draw = ImageDraw.Draw(overlay) | |
| overlay_draw.rectangle( | |
| [0, 0, size[0], size[1]], | |
| fill=(255, 255, 255, 100) # Semi-transparent white | |
| ) | |
| img = Image.alpha_composite(img.convert('RGBA'), overlay) | |
| # Add text with improved styling | |
| text = self.clean_text(text) | |
| wrapped_text = textwrap.fill(text, width=50) | |
| # Calculate text position | |
| text_bbox = draw.textbbox((0, 0), wrapped_text, font=self.font) | |
| text_width = text_bbox[2] - text_bbox[0] | |
| text_height = text_bbox[3] - text_bbox[1] | |
| text_x = (size[0] - text_width) // 2 | |
| text_y = size[1] - text_height - 100 # Position at bottom | |
| # Draw text background | |
| padding = 20 | |
| draw.rectangle( | |
| [ | |
| text_x - padding, | |
| text_y - padding, | |
| text_x + text_width + padding, | |
| text_y + text_height + padding | |
| ], | |
| fill=(0, 0, 0, 160) # Semi-transparent black | |
| ) | |
| # Draw text | |
| draw.text( | |
| (text_x, text_y), | |
| wrapped_text, | |
| fill=(255, 255, 255, 255), | |
| font=self.font | |
| ) | |
| # Add progress bar with animation | |
| self.draw_animated_progress_bar( | |
| draw, | |
| frame_number, | |
| total_frames, | |
| size, | |
| theme | |
| ) | |
| return np.array(img) | |
| except Exception as e: | |
| self.logger.error(f"Frame creation failed: {str(e)}") | |
| # Return fallback frame | |
| return np.full((size[1], size[0], 3), theme['bg'], dtype=np.uint8) | |
| def draw_animated_progress_bar( | |
| self, | |
| draw: ImageDraw.Draw, | |
| frame_number: int, | |
| total_frames: int, | |
| size: Tuple[int, int], | |
| theme: dict | |
| ): | |
| """Draw an animated progress bar with effects""" | |
| try: | |
| progress = frame_number / total_frames | |
| bar_width = int(size[0] * 0.8) # 80% of screen width | |
| bar_height = 6 | |
| x_offset = (size[0] - bar_width) // 2 | |
| y_position = size[1] - 40 | |
| # Draw background bar | |
| draw.rectangle( | |
| [x_offset, y_position, x_offset + bar_width, y_position + bar_height], | |
| fill=(200, 200, 200, 160) | |
| ) | |
| # Draw progress with gradient effect | |
| progress_width = int(bar_width * progress) | |
| for x in range(progress_width): | |
| alpha = int(255 * (x / bar_width)) # Gradient effect | |
| draw.line( | |
| [x_offset + x, y_position, x_offset + x, y_position + bar_height], | |
| fill=(theme['accent'][0], theme['accent'][1], theme['accent'][2], alpha) | |
| ) | |
| # Add animated highlight | |
| highlight_pos = x_offset + progress_width | |
| if highlight_pos < x_offset + bar_width: | |
| draw.rectangle( | |
| [highlight_pos-2, y_position-1, highlight_pos+2, y_position + bar_height+1], | |
| fill=(255, 255, 255, 200) | |
| ) | |
| except Exception as e: | |
| self.logger.error(f"Progress bar drawing failed: {str(e)}") | |
| def generate_voice_over(self, script: str) -> AudioFileClip: | |
| try: | |
| # Try ElevenLabs first | |
| audio_path = self.temp_dir / "voice.mp3" | |
| headers = { | |
| "xi-api-key": self.ELEVEN_LABS_API_KEY, | |
| "Content-Type": "application/json" | |
| } | |
| data = { | |
| "text": script, | |
| "model_id": "eleven_monolingual_v1", | |
| "voice_settings": { | |
| "stability": 0.75, | |
| "similarity_boost": 0.75 | |
| } | |
| } | |
| response = requests.post( | |
| "https://api.elevenlabs.io/v1/text-to-speech/21m00Tcm4TlvDq8ikWAM", | |
| headers=headers, | |
| json=data | |
| ) | |
| if response.status_code == 200: | |
| with open(audio_path, "wb") as f: | |
| f.write(response.content) | |
| else: | |
| # Fallback to Azure TTS | |
| speech_config = speechsdk.SpeechConfig( | |
| subscription=self.AZURE_SPEECH_KEY, | |
| region=self.AZURE_REGION | |
| ) | |
| speech_config.speech_synthesis_voice_name = "en-US-JennyNeural" | |
| synthesizer = speechsdk.SpeechSynthesizer(speech_config=speech_config) | |
| result = synthesizer.speak_text_async(script).get() | |
| if result.reason == speechsdk.ResultReason.SynthesizingAudioCompleted: | |
| with open(audio_path, "wb") as f: | |
| f.write(result.audio_data) | |
| return AudioFileClip(str(audio_path)) | |
| except Exception as e: | |
| print(f"Voice generation error: {e}") | |
| return self.generate_fallback_audio(script) | |
| def generate_subtitles(self, script: str, duration: int) -> str: | |
| words = script.split() | |
| words_per_second = len(words) / duration | |
| subtitle_path = self.temp_dir / "subtitles.srt" | |
| with open(subtitle_path, 'w') as f: | |
| current_time = 0 | |
| words_per_subtitle = int(words_per_second * 3) # 3 seconds per subtitle | |
| for i in range(0, len(words), words_per_subtitle): | |
| subtitle_words = words[i:i + words_per_subtitle] | |
| if subtitle_words: | |
| start_time = self.format_time(current_time) | |
| current_time += len(subtitle_words) / words_per_second | |
| end_time = self.format_time(current_time) | |
| f.write(f"{i//words_per_subtitle + 1}\n") | |
| f.write(f"{start_time} --> {end_time}\n") | |
| f.write(f"{' '.join(subtitle_words)}\n\n") | |
| return str(subtitle_path) | |
| def format_time(seconds: float) -> str: | |
| hours = int(seconds // 3600) | |
| minutes = int((seconds % 3600) // 60) | |
| secs = int(seconds % 60) | |
| msecs = int((seconds - int(seconds)) * 1000) | |
| return f"{hours:02d}:{minutes:02d}:{secs:02d},{msecs:03d}" | |
| def create_video(self, script: str, style: str, duration: int, output_path: str, selected_images: List[str]) -> str: | |
| """Create video with selected images""" | |
| try: | |
| # Progress bar | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| # Generate voice-over (20%) | |
| status_text.text("Creating voice-over...") | |
| audio = self.generate_voice_over(script) | |
| progress_bar.progress(20) | |
| # Process selected images (40%) | |
| status_text.text("Processing images...") | |
| processed_images = [] | |
| for img_url in selected_images: | |
| response = requests.get(img_url) | |
| img = Image.open(BytesIO(response.content)) | |
| img = img.resize((1920, 1080), Image.Resampling.LANCZOS) | |
| processed_images.append(np.array(img)) | |
| progress_bar.progress(40) | |
| # Create frames with transitions | |
| fps = 30 | |
| total_frames = int(duration * fps) | |
| frames = [] | |
| status_text.text("Generating frames...") | |
| frames_per_image = total_frames // len(processed_images) | |
| for idx, img in enumerate(processed_images): | |
| for _ in range(frames_per_image): | |
| frames.append(img) | |
| # Add transition frames | |
| if idx < len(processed_images) - 1: | |
| next_img = processed_images[idx + 1] | |
| for alpha in np.linspace(0, 1, 15): | |
| transition_frame = (1 - alpha) * img + alpha * next_img | |
| frames.append(transition_frame.astype(np.uint8)) | |
| progress_bar.progress(70) | |
| # Create video clip | |
| status_text.text("Compiling video...") | |
| video = ImageSequenceClip(frames, fps=fps) | |
| video = video.set_audio(audio) | |
| progress_bar.progress(90) | |
| # Write final video | |
| status_text.text("Saving video...") | |
| video.write_videofile( | |
| output_path, | |
| fps=fps, | |
| codec='libx264', | |
| audio_codec='aac', | |
| threads=4, | |
| preset='ultrafast' | |
| ) | |
| progress_bar.progress(100) | |
| status_text.text("Video generation complete!") | |
| return output_path | |
| except Exception as e: | |
| self.logger.error(f"Video creation failed: {str(e)}") | |
| raise | |
| def generate_visual_assets(self, script: str, style: str) -> List[Dict]: | |
| """Generate relevant visual assets based on script content""" | |
| try: | |
| # Simplified asset generation for faster processing | |
| topics = self.extract_key_topics(script)[:3] # Limit to 3 topics | |
| assets = [] | |
| for topic in topics: | |
| # Create simple colored backgrounds instead of AI images | |
| img = Image.new('RGB', (1920, 1080), self.themes[style]['bg']) | |
| assets.append({ | |
| 'type': 'image', | |
| 'data': img, | |
| 'topic': topic | |
| }) | |
| return assets | |
| except Exception as e: | |
| self.logger.error(f"Visual asset generation failed: {str(e)}") | |
| return [] | |
| def clean_text(text: str) -> str: | |
| """Clean and normalize text for display""" | |
| if not isinstance(text, str): | |
| text = str(text) | |
| # Normalize unicode characters | |
| text = unicodedata.normalize('NFKD', text) | |
| # Remove non-ASCII characters | |
| text = text.encode('ascii', 'ignore').decode('ascii') | |
| # Replace problematic characters | |
| replacements = { | |
| '–': '-', # en dash | |
| '—': '-', # em dash | |
| '"': '"', # smart quotes | |
| '"': '"', # smart quotes | |
| ''': "'", # smart apostrophe | |
| ''': "'", # smart apostrophe | |
| '…': '...', # ellipsis | |
| } | |
| for old, new in replacements.items(): | |
| text = text.replace(old, new) | |
| # Remove any remaining non-standard characters | |
| text = re.sub(r'[^\x00-\x7F]+', '', text) | |
| return text.strip() | |
| def extract_key_topics(self, script: str) -> List[str]: | |
| """Extract main topics from the script for visual asset generation""" | |
| try: | |
| # Simple keyword extraction based on noun phrases | |
| # In a production environment, you might want to use a proper NLP library | |
| sentences = script.split('.') | |
| topics = [] | |
| for sentence in sentences: | |
| words = sentence.strip().split() | |
| if len(words) >= 2: | |
| # Extract potential noun phrases (pairs of words) | |
| topics.append(' '.join(words[:2])) | |
| # Remove duplicates and limit to top 5 topics | |
| return list(dict.fromkeys(topics))[:5] | |
| except Exception as e: | |
| self.logger.error(f"Topic extraction failed: {str(e)}") | |
| return ["default topic"] | |
| def generate_ai_image(self, prompt: str, style: str) -> Optional[Image.Image]: | |
| """Generate an AI image using Stability AI""" | |
| try: | |
| if not self.stability_api: | |
| return None | |
| # Enhance prompt based on style | |
| style_prompts = { | |
| 'Professional': "professional, corporate, clean, modern", | |
| 'Creative': "artistic, vibrant, innovative, dynamic", | |
| 'Educational': "clear, informative, academic, detailed" | |
| } | |
| enhanced_prompt = f"{prompt}, {style_prompts.get(style, '')}, high quality, 4k" | |
| # Generate image | |
| response = self.stability_api.generate( | |
| prompt=enhanced_prompt, | |
| samples=1, | |
| width=1920, | |
| height=1080 | |
| ) | |
| if response and len(response) > 0: | |
| image_data = response[0].image | |
| return Image.open(io.BytesIO(image_data)) | |
| return None | |
| except Exception as e: | |
| self.logger.error(f"AI image generation failed: {str(e)}") | |
| return None | |
| def cleanup(self): | |
| """Clean up temporary files and resources""" | |
| try: | |
| for file in self.temp_dir.glob('*'): | |
| try: | |
| if file.is_file(): | |
| file.unlink() | |
| elif file.is_dir(): | |
| import shutil | |
| shutil.rmtree(file) | |
| except Exception as e: | |
| self.logger.warning(f"Failed to delete {file}: {str(e)}") | |
| self.temp_dir.rmdir() | |
| except Exception as e: | |
| self.logger.error(f"Cleanup failed: {str(e)}") | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| self.cleanup() | |
| # Streamlit UI Class | |
| class VideoGeneratorUI: | |
| def __init__(self): | |
| self.generator = EnhancedVideoGenerator() | |
| self.setup_ui() | |
| def setup_ui(self): | |
| st.set_page_config(layout="wide") | |
| # Custom CSS | |
| st.markdown(""" | |
| <style> | |
| .stApp { | |
| max-width: 1200px; | |
| margin: 0 auto; | |
| } | |
| .image-grid { | |
| display: grid; | |
| grid-template-columns: repeat(auto-fill, minmax(250px, 1fr)); | |
| gap: 1rem; | |
| } | |
| .stButton>button { | |
| width: 100%; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # Header | |
| st.title("VaultGenix Video Generator") | |
| st.markdown("Create professional videos for your digital legacy management platform") | |
| # Input Section | |
| with st.container(): | |
| prompt = st.text_area("Enter your video script", height=200) | |
| if prompt: | |
| with st.spinner("Fetching images..."): | |
| try: | |
| st.subheader("Select Images for Your Video") | |
| images = self.generator.image_scraper.get_images(prompt) | |
| if not images: | |
| st.warning("No images found. Loading default technology images...") | |
| images = self.generator.image_scraper.get_stock_images() | |
| # Image Selection Grid | |
| selected_images = [] | |
| cols = st.columns(3) | |
| for idx, img_url in enumerate(images): | |
| with cols[idx % 3]: | |
| try: | |
| # Verify image URL before displaying | |
| if self.generator.image_scraper.verify_image_url(img_url): | |
| st.image(img_url, use_container_width=True) | |
| if st.checkbox(f"Select Image {idx + 1}", key=f"img_{idx}"): | |
| selected_images.append(img_url) | |
| except Exception as e: | |
| print(f"Error displaying image {idx}: {str(e)}") | |
| continue | |
| # Only show video settings if images are available | |
| if selected_images: | |
| st.subheader("Video Settings") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| style = st.selectbox( | |
| "Choose style", | |
| options=["Professional", "Creative", "Educational"], | |
| index=0 | |
| ) | |
| with col2: | |
| duration = st.slider( | |
| "Video duration (seconds)", | |
| min_value=30, | |
| max_value=180, | |
| value=60, | |
| step=30 | |
| ) | |
| if st.button("Generate Video", type="primary"): | |
| with st.spinner("Generating your video..."): | |
| try: | |
| output_path = f"vaultgenix_video_{int(time.time())}.mp4" | |
| video_path = self.generator.create_video( | |
| prompt, | |
| style, | |
| duration, | |
| output_path, | |
| selected_images | |
| ) | |
| if os.path.exists(video_path): | |
| st.success("✨ Video generated successfully!") | |
| st.video(video_path) | |
| with open(video_path, 'rb') as video_file: | |
| st.download_button( | |
| "⬇️ Download Video", | |
| video_file.read(), | |
| file_name=output_path, | |
| mime="video/mp4" | |
| ) | |
| except Exception as e: | |
| st.error(f"Failed to generate video: {str(e)}") | |
| print(f"Video generation error: {str(e)}") | |
| else: | |
| st.error("No valid images found. Please try a different search term.") | |
| except Exception as e: | |
| st.error(f"An error occurred: {str(e)}") | |
| print(f"Setup error: {str(e)}") | |
| if __name__ == "__main__": | |
| ui = VideoGeneratorUI() |