Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| from pathlib import Path | |
| import torch | |
| from typing import Tuple | |
| from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer | |
| from PIL import Image, ImageDraw, ImageFont | |
| import tempfile | |
| import os | |
| from moviepy.editor import * | |
| import numpy as np | |
| from gtts import gTTS | |
| import textwrap | |
| from concurrent.futures import ThreadPoolExecutor | |
| import io | |
| import unicodedata | |
| import re | |
| import requests | |
| import random | |
| import logging | |
| import time | |
| from typing import Optional, List, Dict, Tuple, Callable | |
| from bs4 import BeautifulSoup | |
| import requests | |
| from io import BytesIO | |
| class ImageScraper: | |
| def __init__(self): | |
| self.PIXABAY_API_KEY = "48069976-37e20099248207cee12385560" | |
| self.headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| } | |
| self.temp_dir = Path(tempfile.mkdtemp()) | |
| # Initialize keyword extractor model | |
| try: | |
| self.keyword_model = pipeline( | |
| "text-classification", | |
| model="facebook/bart-large-mnli", | |
| device=0 if torch.cuda.is_available() else -1 | |
| ) | |
| except Exception as e: | |
| print(f"Failed to load keyword model: {e}") | |
| self.keyword_model = None | |
| def extract_keywords(self, text: str) -> List[Dict[str, str]]: | |
| """Extract relevant keywords and categories from text using AI""" | |
| keywords = [] | |
| try: | |
| # Define candidate labels for classification | |
| candidate_labels = [ | |
| "technology", "science", "education", "business", | |
| "health", "nature", "people", "urban", "abstract", | |
| "sports", "food", "travel", "architecture", "art", | |
| "music", "fashion", "medical", "industrial", "space", | |
| "environmental", "historical", "cultural", "professional" | |
| ] | |
| # Use model to classify text against each label | |
| if self.keyword_model: | |
| results = self.keyword_model(text, candidate_labels, multi_label=True) | |
| # Filter results with high confidence | |
| for score, label in zip(results['scores'], results['labels']): | |
| if score > 0.3: # Confidence threshold | |
| keywords.append({ | |
| 'keyword': label, | |
| 'confidence': score, | |
| 'category': self.categorize_keyword(label) | |
| }) | |
| # Extract additional keywords using NLP | |
| additional_keywords = self.extract_noun_phrases(text) | |
| for keyword in additional_keywords: | |
| keywords.append({ | |
| 'keyword': keyword, | |
| 'confidence': 0.5, | |
| 'category': 'content_specific' | |
| }) | |
| # Sort by confidence | |
| keywords = sorted(keywords, key=lambda x: x['confidence'], reverse=True) | |
| return keywords | |
| except Exception as e: | |
| print(f"Keyword extraction error: {e}") | |
| return self.get_fallback_keywords() | |
| def process_images(images): | |
| processed_images = [] | |
| for img in images: | |
| # Load and resize image | |
| image = Image.open(requests.get(img['url'], stream=True).raw) | |
| image = image.resize((640, 480)) # Resize to reduce memory usage | |
| processed_images.append(image) | |
| return processed_images | |
| def extract_noun_phrases(self, text: str) -> List[str]: | |
| """Extract important noun phrases from text""" | |
| words = text.lower().split() | |
| phrases = [] | |
| # Common adjectives that might indicate important concepts | |
| adjectives = {'digital', 'smart', 'modern', 'advanced', 'innovative', | |
| 'technical', 'professional', 'creative', 'strategic'} | |
| for i in range(len(words)-1): | |
| if words[i] in adjectives: | |
| phrases.append(f"{words[i]} {words[i+1]}") | |
| return list(set(phrases)) | |
| def categorize_keyword(self, keyword: str) -> str: | |
| """Categorize keyword into general themes""" | |
| categories = { | |
| 'technical': {'technology', 'digital', 'software', 'computer', 'cyber'}, | |
| 'scientific': {'science', 'research', 'laboratory', 'experiment'}, | |
| 'business': {'business', 'professional', 'corporate', 'office'}, | |
| 'educational': {'education', 'learning', 'teaching', 'academic'}, | |
| 'creative': {'art', 'design', 'creative', 'innovation'}, | |
| } | |
| for category, terms in categories.items(): | |
| if any(term in keyword.lower() for term in terms): | |
| return category | |
| return 'general' | |
| def extract_key_topics(self, script: str) -> List[str]: | |
| """Extract key topics from script with improved VaultGenix-specific processing""" | |
| try: | |
| # Define relevant categories for VaultGenix | |
| categories = { | |
| 'legacy': [ | |
| 'digital legacy', 'legacy management', 'digital estate', | |
| 'posthumous', 'inheritance', 'heir', 'custodian' | |
| ], | |
| 'security': [ | |
| 'encryption', 'security', 'protection', 'privacy', 'AES-256', | |
| 'data security', 'secure', 'authentication' | |
| ], | |
| 'technology': [ | |
| 'AI', 'artificial intelligence', 'platform', 'digital', | |
| 'automation', 'analytics' | |
| ], | |
| 'management': [ | |
| 'asset management', 'directive', 'planning', 'preservation', | |
| 'customization', 'optimization' | |
| ], | |
| 'identity': [ | |
| 'digital identity', 'presence', 'account', 'profile', | |
| 'digital footprint' | |
| ] | |
| } | |
| # Process text | |
| text = script.lower() | |
| found_topics = set() | |
| # Extract category-based matches | |
| for category, terms in categories.items(): | |
| for term in terms: | |
| if term in text: | |
| # Add both the term and its category combination | |
| found_topics.add(term) | |
| if category in ['legacy', 'security', 'technology']: | |
| found_topics.add(f"digital {term}") | |
| found_topics.add(f"{category} management") | |
| # Extract key compound phrases | |
| important_phrases = [ | |
| 'digital legacy management', | |
| 'AI-driven platform', | |
| 'digital estate planning', | |
| 'legacy preservation', | |
| 'secure inheritance', | |
| 'digital asset protection', | |
| 'intelligent legacy system', | |
| 'automated legacy management', | |
| 'digital identity preservation', | |
| 'secure legacy platform' | |
| ] | |
| for phrase in important_phrases: | |
| if phrase.lower() in text: | |
| found_topics.add(phrase) | |
| # Prioritize topics based on VaultGenix focus | |
| priority_topics = sorted( | |
| found_topics, | |
| key=lambda x: ( | |
| 'digital legacy' in x, | |
| 'security' in x or 'secure' in x, | |
| 'AI' in x.lower() or 'intelligence' in x.lower(), | |
| 'management' in x, | |
| len(x.split()) # Prefer compound terms | |
| ), | |
| reverse=True | |
| ) | |
| # Return top unique topics | |
| return list(dict.fromkeys(priority_topics))[:8] | |
| except Exception as e: | |
| self.logger.error(f"Topic extraction error: {e}") | |
| return [ | |
| 'digital legacy management', | |
| 'secure inheritance', | |
| 'AI-driven platform', | |
| 'digital asset protection', | |
| 'legacy preservation' | |
| ] | |
| def get_images_for_keyword(self, keyword: str) -> List[Dict[str, str]]: | |
| """Get images for a specific keyword with improved relevance""" | |
| try: | |
| # Enhance keyword for better search results | |
| enhanced_keywords = { | |
| 'digital': 'digital technology security', | |
| 'security': 'cybersecurity protection', | |
| 'legacy': 'digital legacy inheritance', | |
| 'management': 'digital management system', | |
| 'AI': 'artificial intelligence technology', | |
| 'protection': 'data protection security' | |
| } | |
| search_term = enhanced_keywords.get(keyword, keyword) | |
| base_url = "https://pixabay.com/api/" | |
| params = { | |
| 'key': self.PIXABAY_API_KEY, | |
| 'q': search_term, | |
| 'image_type': 'photo', | |
| 'per_page': 5, | |
| 'safesearch': True, | |
| 'lang': 'en', | |
| 'category': 'technology', # Focus on technology category | |
| 'orientation': 'horizontal' # Better for video | |
| } | |
| response = requests.get(base_url, params=params, headers=self.headers) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if 'hits' in data and data['hits']: | |
| return [{ | |
| 'url': img['largeImageURL'], | |
| 'keyword': keyword, | |
| 'relevance': 'Primary match' if keyword.lower() in img['tags'].lower() else 'Related', | |
| 'tags': img['tags'] | |
| } for img in data['hits']] | |
| return [] | |
| except Exception as e: | |
| print(f"Error fetching images for keyword {keyword}: {e}") | |
| return [] | |
| def get_pixabay_images(self, query: str) -> List[str]: | |
| """Get images from Pixabay API with enhanced error handling""" | |
| try: | |
| # Clean and encode the query | |
| clean_query = query.replace(' ', '+').strip() | |
| base_url = "https://pixabay.com/api/" | |
| params = { | |
| 'key': self.PIXABAY_API_KEY, | |
| 'q': clean_query, | |
| 'image_type': 'photo', | |
| 'per_page': 20, | |
| 'safesearch': True, | |
| 'lang': 'en' | |
| } | |
| response = requests.get(base_url, params=params, headers=self.headers) | |
| # Debug logging | |
| print(f"Pixabay API URL: {response.url}") | |
| print(f"Response status: {response.status_code}") | |
| if response.status_code == 200: | |
| data = response.json() | |
| print(f"Total hits: {data.get('totalHits', 0)}") | |
| if 'hits' in data and data['hits']: | |
| image_urls = [img['largeImageURL'] for img in data['hits']] | |
| print(f"Found {len(image_urls)} images") | |
| return image_urls | |
| else: | |
| print("No images found in response") | |
| return self.get_stock_images() | |
| else: | |
| print(f"Pixabay API error: Status code {response.status_code}") | |
| return self.get_stock_images() | |
| except Exception as e: | |
| print(f"Exception in get_pixabay_images: {str(e)}") | |
| return self.get_stock_images() | |
| def get_stock_images(self) -> List[str]: | |
| """Return preset stock images as fallback""" | |
| return [ | |
| "https://images.pexels.com/photos/60504/security-protection-anti-virus-software-60504.jpeg", | |
| "https://images.pexels.com/photos/5380642/pexels-photo-5380642.jpeg", | |
| "https://images.pexels.com/photos/2582937/pexels-photo-2582937.jpeg", | |
| "https://images.pexels.com/photos/7319074/pexels-photo-7319074.jpeg", | |
| "https://images.pexels.com/photos/4164418/pexels-photo-4164418.jpeg", | |
| "https://images.pexels.com/photos/3861969/pexels-photo-3861969.jpeg", | |
| "https://images.pexels.com/photos/5473298/pexels-photo-5473298.jpeg", | |
| "https://images.pexels.com/photos/4348401/pexels-photo-4348401.jpeg", | |
| "https://images.pexels.com/photos/8386440/pexels-photo-8386440.jpeg", | |
| "https://images.pexels.com/photos/5473950/pexels-photo-5473950.jpeg" | |
| ] | |
| def get_images(self, query: str, num_images: int = 15) -> Dict[str, List[Dict[str, str]]]: | |
| """Get images with enhanced AI-driven selection and ranking""" | |
| try: | |
| # Extract key topics and their importance | |
| topics = self.extract_key_topics(query) | |
| topic_scores = {topic: score for score, topic in | |
| zip(np.linspace(1.0, 0.6, len(topics)), topics)} | |
| # Initialize categories | |
| result = { | |
| 'primary': [], | |
| 'secondary': [], | |
| 'general': [] | |
| } | |
| # Fetch and analyze images for each topic | |
| for topic, base_score in topic_scores.items(): | |
| images = self.get_images_for_keyword(topic) | |
| for img in images: | |
| # Enhanced relevance scoring | |
| relevance_score = self.calculate_relevance_score(img, topic, base_score, query) | |
| img['relevance_score'] = relevance_score | |
| # Categorize based on relevance score | |
| if relevance_score > 0.8: | |
| result['primary'].append(img) | |
| elif relevance_score > 0.6: | |
| result['secondary'].append(img) | |
| else: | |
| result['general'].append(img) | |
| # Sort each category by relevance score | |
| for category in result: | |
| result[category] = sorted( | |
| result[category], | |
| key=lambda x: x['relevance_score'], | |
| reverse=True | |
| )[:num_images // 3] # Limit images per category | |
| return result | |
| except Exception as e: | |
| print(f"Error in get_images: {str(e)}") | |
| return self.get_fallback_images(num_images) | |
| def calculate_relevance_score(self, image: Dict[str, str], topic: str, base_score: float, query: str) -> float: | |
| """Calculate enhanced relevance score for an image""" | |
| score = base_score | |
| # Analyze image tags | |
| tags = set(image['tags'].lower().split(',')) | |
| query_words = set(query.lower().split()) | |
| # Direct matches with query | |
| query_matches = len(tags.intersection(query_words)) | |
| score += query_matches * 0.1 | |
| # Topic relevance | |
| if topic.lower() in tags: | |
| score += 0.2 | |
| # Context relevance | |
| relevant_terms = { | |
| 'digital': 0.15, | |
| 'security': 0.15, | |
| 'technology': 0.1, | |
| 'professional': 0.1, | |
| 'modern': 0.05 | |
| } | |
| for term, weight in relevant_terms.items(): | |
| if term in tags: | |
| score += weight | |
| return min(score, 1.0) # Normalize to 0-1 | |
| def score_keywords(self, query: str, keywords: List[str]) -> Dict[str, float]: | |
| """Score keywords based on relevance to query""" | |
| scores = {} | |
| query_words = set(query.lower().split()) | |
| for keyword in keywords: | |
| score = 0.0 | |
| keyword_words = set(keyword.lower().split()) | |
| # Direct word match | |
| word_matches = len(keyword_words.intersection(query_words)) | |
| score += word_matches * 0.3 | |
| # Contextual relevance | |
| context_terms = { | |
| 'digital': 0.8, | |
| 'security': 0.7, | |
| 'legacy': 0.9, | |
| 'protection': 0.6, | |
| 'management': 0.5, | |
| 'AI': 0.8, | |
| 'technology': 0.6 | |
| } | |
| for term, weight in context_terms.items(): | |
| if term in keyword.lower(): | |
| score += weight | |
| scores[keyword] = min(score, 1.0) # Normalize to 0-1 | |
| return scores | |
| def analyze_image_relevance(self, image: Dict[str, str], query: str) -> float: | |
| """Analyze image relevance based on tags and metadata""" | |
| score = 0.0 | |
| # Analyze tags | |
| tags = set(image['tags'].lower().split(',')) | |
| query_words = set(query.lower().split()) | |
| # Tag matching | |
| matching_tags = len(tags.intersection(query_words)) | |
| score += matching_tags * 0.2 | |
| # Context relevance | |
| relevant_terms = { | |
| 'technology': 0.3, | |
| 'digital': 0.3, | |
| 'security': 0.3, | |
| 'business': 0.2, | |
| 'professional': 0.2, | |
| 'modern': 0.1 | |
| } | |
| for term, weight in relevant_terms.items(): | |
| if term in tags: | |
| score += weight | |
| return min(score, 1.0) # Normalize to 0-1 | |
| def get_fallback_keywords(self) -> List[Dict[str, str]]: | |
| """Return fallback keywords if AI extraction fails""" | |
| return [ | |
| {'keyword': 'technology', 'confidence': 1.0, 'category': 'technical'}, | |
| {'keyword': 'business', 'confidence': 0.8, 'category': 'business'}, | |
| {'keyword': 'professional', 'confidence': 0.8, 'category': 'business'}, | |
| {'keyword': 'digital', 'confidence': 0.7, 'category': 'technical'} | |
| ] | |
| def verify_image_url(self, url: str) -> bool: | |
| """Verify if an image URL is accessible""" | |
| try: | |
| response = requests.head(url, timeout=5) | |
| return response.status_code == 200 | |
| except: | |
| return False | |
| def generate_fallback_audio(self, script: str) -> AudioFileClip: | |
| """Generate fallback audio using gTTS""" | |
| try: | |
| audio_path = self.temp_dir / "voice.mp3" | |
| tts = gTTS(text=script, lang='en', slow=False) | |
| tts.save(str(audio_path)) | |
| return AudioFileClip(str(audio_path)) | |
| except Exception as e: | |
| print(f"Fallback audio generation failed: {e}") | |
| # Create silent audio clip | |
| return AudioFileClip(str(audio_path)) if os.path.exists(str(audio_path)) else None | |
| def scrape_pexels(self, query: str) -> List[str]: | |
| urls = [] | |
| try: | |
| url = f"https://www.pexels.com/search/{query.replace(' ', '%20')}/" | |
| response = requests.get(url, headers=self.headers) | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # Updated selector to target image sources | |
| for img in soup.find_all('img', {'data-image-width': True}): | |
| if img.get('src') and 'photos' in img['src']: | |
| urls.append(img['src']) | |
| except Exception as e: | |
| print(f"Pexels scraping error: {e}") | |
| return urls | |
| def scrape_unsplash(self, query: str) -> List[str]: | |
| urls = [] | |
| try: | |
| url = f"https://unsplash.com/s/photos/{query.replace(' ', '-')}" | |
| response = requests.get(url, headers=self.headers) | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # Updated selector for Unsplash | |
| for img in soup.find_all('img', {'srcset': True}): | |
| src = img.get('src') | |
| if src and 'images.unsplash.com' in src: | |
| urls.append(src) | |
| except Exception as e: | |
| print(f"Unsplash scraping error: {e}") | |
| return urls | |
| class EnhancedVideoGenerator: | |
| def __init__(self): | |
| try: | |
| self.setup_logging() | |
| self.setup_device() | |
| self.initialize_models() | |
| self.setup_workspace() | |
| self.load_assets() | |
| self.setup_themes() | |
| self.image_scraper = ImageScraper() | |
| # Add missing configurations | |
| self.ELEVEN_LABS_API_KEY = "sk_acdad9d2d82d504bddbe5ed4aa290ca772c106aed5b128ba" # Replace with actual key | |
| self.temp_dir = Path(tempfile.mkdtemp()) | |
| except Exception as e: | |
| logging.error(f"Initialization failed: {str(e)}") | |
| raise RuntimeError("Failed to initialize video generator") | |
| def generate_video_in_background(script): | |
| with ThreadPoolExecutor() as executor: | |
| future = executor.submit(generate_video, script) | |
| return future.result() # Wait for the result without blocking UI | |
| def generate_video(script): | |
| st.progress(0) | |
| try: | |
| # Your video generation logic here | |
| for i in range(100): # Simulate progress | |
| time.sleep(0.1) # Simulate work being done | |
| st.progress(i + 1) # Update progress bar | |
| except Exception as e: | |
| logging.error(f"Error during video generation: {e}") | |
| st.error("An error occurred while generating the video.") | |
| def generate_fallback_audio(self, script: str) -> AudioFileClip: | |
| """Generate fallback audio using gTTS""" | |
| try: | |
| audio_path = self.temp_dir / "voice.mp3" | |
| tts = gTTS(text=script, lang='en', slow=False) | |
| tts.save(str(audio_path)) | |
| return AudioFileClip(str(audio_path)) | |
| except Exception as e: | |
| print(f"Fallback audio generation failed: {e}") | |
| # Create silent audio clip | |
| return AudioFileClip(str(audio_path)) if os.path.exists(str(audio_path)) else None | |
| def apply_video_effects(self, frame: np.ndarray, effect_params: dict) -> np.ndarray: | |
| """Apply various video effects to a frame""" | |
| try: | |
| # Convert to PIL Image for effects | |
| img = Image.fromarray(frame) | |
| # Apply zoom if specified | |
| if effect_params.get('zoom'): | |
| zoom_factor = effect_params['zoom'] | |
| w, h = img.size | |
| zoom_w = int(w * zoom_factor) | |
| zoom_h = int(h * zoom_factor) | |
| # Calculate crop box to maintain center | |
| left = (zoom_w - w) // 2 | |
| top = (zoom_h - h) // 2 | |
| right = left + w | |
| bottom = top + h | |
| img = img.resize((zoom_w, zoom_h), Image.LANCZOS) | |
| img = img.crop((left, top, right, bottom)) | |
| # Apply brightness adjustment | |
| if 'brightness' in effect_params: | |
| enhancer = ImageEnhance.Brightness(img) | |
| img = enhancer.enhance(effect_params['brightness']) | |
| # Apply contrast adjustment | |
| if 'contrast' in effect_params: | |
| enhancer = ImageEnhance.Contrast(img) | |
| img = enhancer.enhance(effect_params['contrast']) | |
| # Apply blur effect | |
| if effect_params.get('blur'): | |
| frame = np.array(img) | |
| frame = cv2.GaussianBlur(frame, (5, 5), 0) | |
| return frame | |
| return np.array(img) | |
| except Exception as e: | |
| self.logger.error(f"Effect application failed: {str(e)}") | |
| return frame | |
| def setup_logging(self): | |
| """Configure logging for the application""" | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler('video_generator.log'), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| self.logger = logging.getLogger(__name__) | |
| def setup_device(self): | |
| """Set up computing device (CPU/GPU)""" | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| self.logger.info(f"Using device: {self.device}") | |
| def initialize_models(self): | |
| """Initialize all AI models""" | |
| try: | |
| # Text generation model initialization with error handling | |
| try: | |
| self.text_generator = pipeline( | |
| 'text-generation', | |
| model='gpt2', | |
| device=0 if self.device == "cuda" else -1 | |
| ) | |
| except Exception as e: | |
| self.logger.warning(f"Text generator initialization failed: {str(e)}") | |
| self.text_generator = None | |
| # Skip the StableDiffusion model initialization as it requires additional setup | |
| self.image_model = None | |
| # Initialize stability API attribute | |
| self.stability_api = None | |
| except Exception as e: | |
| self.logger.error(f"Model initialization failed: {str(e)}") | |
| # Don't raise exception, allow initialization with degraded functionality | |
| pass | |
| def setup_workspace(self): | |
| """Set up working directory and resources""" | |
| self.temp_dir = Path(tempfile.mkdtemp()) | |
| self.asset_dir = self.temp_dir / "assets" | |
| self.asset_dir.mkdir(exist_ok=True) | |
| def setup_themes(self): | |
| """Set up visual themes""" | |
| self.themes = { | |
| 'Professional': { | |
| 'bg': (240, 240, 240), | |
| 'accent': (0, 120, 212), | |
| 'text': (33, 33, 33) | |
| }, | |
| 'Creative': { | |
| 'bg': (255, 250, 240), | |
| 'accent': (255, 123, 0), | |
| 'text': (51, 51, 51) | |
| }, | |
| 'Educational': { | |
| 'bg': (248, 249, 250), | |
| 'accent': (40, 167, 69), | |
| 'text': (33, 37, 41) | |
| } | |
| } | |
| def load_assets(self): | |
| """Load visual assets and fonts""" | |
| try: | |
| # Try multiple font options | |
| font_options = [ | |
| "arial.ttf", | |
| "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", | |
| "/System/Library/Fonts/Helvetica.ttc" | |
| ] | |
| for font_path in font_options: | |
| try: | |
| self.font = ImageFont.truetype(font_path, 40) | |
| break | |
| except OSError: | |
| continue | |
| else: | |
| self.font = ImageFont.load_default() | |
| self.logger.warning("Using default font - custom font loading failed") | |
| except Exception as e: | |
| self.logger.error(f"Asset loading failed: {str(e)}") | |
| def generate_visual_assets(self, script: str, style: str) -> List[Dict]: | |
| """Generate relevant visual assets based on script content""" | |
| try: | |
| # Extract key topics from script | |
| topics = self.extract_key_topics(script) | |
| assets = [] | |
| for topic in topics: | |
| # Generate AI image | |
| image = self.generate_ai_image(topic, style) | |
| if image: | |
| assets.append({ | |
| 'type': 'image', | |
| 'data': image, | |
| 'topic': topic | |
| }) | |
| return assets | |
| except Exception as e: | |
| self.logger.error(f"Visual asset generation failed: {str(e)}") | |
| return [] | |
| def create_enhanced_frame( | |
| self, | |
| text: str, | |
| theme: dict, | |
| frame_number: int, | |
| total_frames: int, | |
| background_image: Optional[Image.Image] = None, | |
| size: Tuple[int, int] = (1920, 1080) # Upgraded to 1080p | |
| ) -> np.ndarray: | |
| """Create a visually enhanced frame with background, text, and effects""" | |
| try: | |
| # Create base frame | |
| if background_image: | |
| # Resize and crop background to fit | |
| bg = background_image.resize(size, Image.LANCZOS) | |
| frame = np.array(bg) | |
| else: | |
| frame = np.full((size[1], size[0], 3), theme['bg'], dtype=np.uint8) | |
| # Convert to PIL Image for drawing | |
| img = Image.fromarray(frame) | |
| draw = ImageDraw.Draw(img, 'RGBA') | |
| # Add subtle gradient overlay | |
| overlay = Image.new('RGBA', size, (0, 0, 0, 0)) | |
| overlay_draw = ImageDraw.Draw(overlay) | |
| overlay_draw.rectangle( | |
| [0, 0, size[0], size[1]], | |
| fill=(255, 255, 255, 100) # Semi-transparent white | |
| ) | |
| img = Image.alpha_composite(img.convert('RGBA'), overlay) | |
| # Add text with improved styling | |
| text = self.clean_text(text) | |
| wrapped_text = textwrap.fill(text, width=50) | |
| # Calculate text position | |
| text_bbox = draw.textbbox((0, 0), wrapped_text, font=self.font) | |
| text_width = text_bbox[2] - text_bbox[0] | |
| text_height = text_bbox[3] - text_bbox[1] | |
| text_x = (size[0] - text_width) // 2 | |
| text_y = size[1] - text_height - 100 # Position at bottom | |
| # Draw text background | |
| padding = 20 | |
| draw.rectangle( | |
| [ | |
| text_x - padding, | |
| text_y - padding, | |
| text_x + text_width + padding, | |
| text_y + text_height + padding | |
| ], | |
| fill=(0, 0, 0, 160) # Semi-transparent black | |
| ) | |
| # Draw text | |
| draw.text( | |
| (text_x, text_y), | |
| wrapped_text, | |
| fill=(255, 255, 255, 255), | |
| font=self.font | |
| ) | |
| # Add progress bar with animation | |
| self.draw_animated_progress_bar( | |
| draw, | |
| frame_number, | |
| total_frames, | |
| size, | |
| theme | |
| ) | |
| return np.array(img) | |
| except Exception as e: | |
| self.logger.error(f"Frame creation failed: {str(e)}") | |
| # Return fallback frame | |
| return np.full((size[1], size[0], 3), theme['bg'], dtype=np.uint8) | |
| def draw_animated_progress_bar( | |
| self, | |
| draw: ImageDraw.Draw, | |
| frame_number: int, | |
| total_frames: int, | |
| size: Tuple[int, int], | |
| theme: dict | |
| ): | |
| """Draw an animated progress bar with effects""" | |
| try: | |
| progress = frame_number / total_frames | |
| bar_width = int(size[0] * 0.8) # 80% of screen width | |
| bar_height = 6 | |
| x_offset = (size[0] - bar_width) // 2 | |
| y_position = size[1] - 40 | |
| # Draw background bar | |
| draw.rectangle( | |
| [x_offset, y_position, x_offset + bar_width, y_position + bar_height], | |
| fill=(200, 200, 200, 160) | |
| ) | |
| # Draw progress with gradient effect | |
| progress_width = int(bar_width * progress) | |
| for x in range(progress_width): | |
| alpha = int(255 * (x / bar_width)) # Gradient effect | |
| draw.line( | |
| [x_offset + x, y_position, x_offset + x, y_position + bar_height], | |
| fill=(theme['accent'][0], theme['accent'][1], theme['accent'][2], alpha) | |
| ) | |
| # Add animated highlight | |
| highlight_pos = x_offset + progress_width | |
| if highlight_pos < x_offset + bar_width: | |
| draw.rectangle( | |
| [highlight_pos-2, y_position-1, highlight_pos+2, y_position + bar_height+1], | |
| fill=(255, 255, 255, 200) | |
| ) | |
| except Exception as e: | |
| self.logger.error(f"Progress bar drawing failed: {str(e)}") | |
| def generate_voice_over(self, script: str) -> AudioFileClip: | |
| try: | |
| # Try ElevenLabs first | |
| audio_path = self.temp_dir / "voice.mp3" | |
| headers = { | |
| "xi-api-key": self.ELEVEN_LABS_API_KEY, | |
| "Content-Type": "application/json" | |
| } | |
| data = { | |
| "text": script, | |
| "model_id": "eleven_monolingual_v1", | |
| "voice_settings": { | |
| "stability": 0.75, | |
| "similarity_boost": 0.75 | |
| } | |
| } | |
| response = requests.post( | |
| "https://api.elevenlabs.io/v1/text-to-speech/21m00Tcm4TlvDq8ikWAM", | |
| headers=headers, | |
| json=data | |
| ) | |
| if response.status_code == 200: | |
| with open(audio_path, "wb") as f: | |
| f.write(response.content) | |
| else: | |
| # Fallback to Azure TTS | |
| speech_config = speechsdk.SpeechConfig( | |
| subscription=self.AZURE_SPEECH_KEY, | |
| region=self.AZURE_REGION | |
| ) | |
| speech_config.speech_synthesis_voice_name = "en-US-JennyNeural" | |
| synthesizer = speechsdk.SpeechSynthesizer(speech_config=speech_config) | |
| result = synthesizer.speak_text_async(script).get() | |
| if result.reason == speechsdk.ResultReason.SynthesizingAudioCompleted: | |
| with open(audio_path, "wb") as f: | |
| f.write(result.audio_data) | |
| return AudioFileClip(str(audio_path)) | |
| except Exception as e: | |
| print(f"Voice generation error: {e}") | |
| return self.generate_fallback_audio(script) | |
| def generate_subtitles(self, script: str, duration: int) -> str: | |
| words = script.split() | |
| words_per_second = len(words) / duration | |
| subtitle_path = self.temp_dir / "subtitles.srt" | |
| with open(subtitle_path, 'w') as f: | |
| current_time = 0 | |
| words_per_subtitle = int(words_per_second * 3) # 3 seconds per subtitle | |
| for i in range(0, len(words), words_per_subtitle): | |
| subtitle_words = words[i:i + words_per_subtitle] | |
| if subtitle_words: | |
| start_time = self.format_time(current_time) | |
| current_time += len(subtitle_words) / words_per_second | |
| end_time = self.format_time(current_time) | |
| f.write(f"{i//words_per_subtitle + 1}\n") | |
| f.write(f"{start_time} --> {end_time}\n") | |
| f.write(f"{' '.join(subtitle_words)}\n\n") | |
| return str(subtitle_path) | |
| def format_time(seconds: float) -> str: | |
| hours = int(seconds // 3600) | |
| minutes = int((seconds % 3600) // 60) | |
| secs = int(seconds % 60) | |
| msecs = int((seconds - int(seconds)) * 1000) | |
| return f"{hours:02d}:{minutes:02d}:{secs:02d},{msecs:03d}" | |
| def create_video(self, script: str, style: str, duration: int, output_path: str, | |
| selected_images: List[str], video_effects: dict = None, | |
| progress_callback: Callable[[float], None] = None) -> str: | |
| """Create video with enhanced error handling and progress tracking""" | |
| try: | |
| if not selected_images: | |
| raise ValueError("No images provided for video generation") | |
| # Initialize moviepy | |
| import moviepy.editor as mpy | |
| from moviepy.editor import ImageSequenceClip, AudioFileClip | |
| # Process images | |
| processed_images = [] | |
| for idx, img_url in enumerate(selected_images): | |
| try: | |
| response = requests.get(img_url, timeout=10) | |
| img = Image.open(BytesIO(response.content)) | |
| img = img.convert('RGB').resize((1920, 1080), Image.LANCZOS) | |
| processed_images.append(np.array(img)) | |
| if progress_callback: | |
| progress_callback((idx + 1) / len(selected_images) * 20) | |
| except Exception as e: | |
| logging.error(f"Error processing image {img_url}: {e}") | |
| continue | |
| if not processed_images: | |
| raise ValueError("Failed to process any images") | |
| # Generate audio | |
| if progress_callback: | |
| progress_callback(25) | |
| audio = self.generate_voice_over(script) | |
| # Calculate video parameters | |
| fps = 30 | |
| total_duration = duration | |
| frames_per_image = int(fps * total_duration / len(processed_images)) | |
| # Generate frames with effects | |
| frames = [] | |
| for img_array in processed_images: | |
| for _ in range(frames_per_image): | |
| if video_effects and video_effects.get('zoom'): | |
| # Apply zoom effect | |
| zoom = video_effects['zoom'] | |
| h, w = img_array.shape[:2] | |
| scaled_h, scaled_w = int(h * zoom), int(w * zoom) | |
| img = Image.fromarray(img_array).resize((scaled_w, scaled_h), Image.LANCZOS) | |
| # Crop to original size from center | |
| left = (scaled_w - w) // 2 | |
| top = (scaled_h - h) // 2 | |
| img = img.crop((left, top, left + w, top + h)) | |
| frames.append(np.array(img)) | |
| else: | |
| frames.append(img_array) | |
| if progress_callback: | |
| progress_callback(60) | |
| # Create video clip | |
| video_clip = ImageSequenceClip(frames, fps=fps) | |
| # Adjust audio duration if needed | |
| if audio.duration > video_clip.duration: | |
| audio = audio.subclip(0, video_clip.duration) | |
| elif audio.duration < video_clip.duration: | |
| video_clip = video_clip.subclip(0, audio.duration) | |
| # Combine video and audio | |
| final_clip = video_clip.set_audio(audio) | |
| if progress_callback: | |
| progress_callback(80) | |
| # Ensure output directory exists | |
| os.makedirs(os.path.dirname(output_path), exist_ok=True) | |
| # Write final video | |
| final_clip.write_videofile( | |
| output_path, | |
| fps=fps, | |
| codec='libx264', | |
| audio_codec='aac', | |
| ffmpeg_params=['-pix_fmt', 'yuv420p'], | |
| verbose=False, | |
| logger=None | |
| ) | |
| if progress_callback: | |
| progress_callback(100) | |
| return output_path | |
| except Exception as e: | |
| logging.error(f"Video creation failed: {str(e)}") | |
| raise | |
| finally: | |
| # Cleanup | |
| try: | |
| if 'video_clip' in locals(): | |
| video_clip.close() | |
| if 'final_clip' in locals(): | |
| final_clip.close() | |
| if 'audio' in locals(): | |
| audio.close() | |
| except Exception as e: | |
| logging.error(f"Cleanup error: {e}") | |
| def generate_visual_assets(self, script: str, style: str) -> List[Dict]: | |
| """Generate relevant visual assets based on script content""" | |
| try: | |
| # Simplified asset generation for faster processing | |
| topics = self.extract_key_topics(script)[:3] # Limit to 3 topics | |
| assets = [] | |
| for topic in topics: | |
| # Create simple colored backgrounds instead of AI images | |
| img = Image.new('RGB', (1920, 1080), self.themes[style]['bg']) | |
| assets.append({ | |
| 'type': 'image', | |
| 'data': img, | |
| 'topic': topic | |
| }) | |
| return assets | |
| except Exception as e: | |
| self.logger.error(f"Visual asset generation failed: {str(e)}") | |
| return [] | |
| def clean_text(text: str) -> str: | |
| """Clean and normalize text for display""" | |
| if not isinstance(text, str): | |
| text = str(text) | |
| # Normalize unicode characters | |
| text = unicodedata.normalize('NFKD', text) | |
| # Remove non-ASCII characters | |
| text = text.encode('ascii', 'ignore').decode('ascii') | |
| # Replace problematic characters | |
| replacements = { | |
| '–': '-', # en dash | |
| '—': '-', # em dash | |
| '"': '"', # smart quotes | |
| '"': '"', # smart quotes | |
| ''': "'", # smart apostrophe | |
| ''': "'", # smart apostrophe | |
| '…': '...', # ellipsis | |
| } | |
| for old, new in replacements.items(): | |
| text = text.replace(old, new) | |
| # Remove any remaining non-standard characters | |
| text = re.sub(r'[^\x00-\x7F]+', '', text) | |
| return text.strip() | |
| def generate_ai_image(self, prompt: str, style: str) -> Optional[Image.Image]: | |
| """Generate an AI image using Stability AI""" | |
| try: | |
| if not self.stability_api: | |
| return None | |
| # Enhance prompt based on style | |
| style_prompts = { | |
| 'Professional': "professional, corporate, clean, modern", | |
| 'Creative': "artistic, vibrant, innovative, dynamic", | |
| 'Educational': "clear, informative, academic, detailed" | |
| } | |
| enhanced_prompt = f"{prompt}, {style_prompts.get(style, '')}, high quality, 4k" | |
| # Generate image | |
| response = self.stability_api.generate( | |
| prompt=enhanced_prompt, | |
| samples=1, | |
| width=1920, | |
| height=1080 | |
| ) | |
| if response and len(response) > 0: | |
| image_data = response[0].image | |
| return Image.open(io.BytesIO(image_data)) | |
| return None | |
| except Exception as e: | |
| self.logger.error(f"AI image generation failed: {str(e)}") | |
| return None | |
| def cleanup(self): | |
| """Clean up temporary files and resources""" | |
| try: | |
| for file in self.temp_dir.glob('*'): | |
| try: | |
| if file.is_file(): | |
| file.unlink() | |
| elif file.is_dir(): | |
| import shutil | |
| shutil.rmtree(file) | |
| except Exception as e: | |
| self.logger.warning(f"Failed to delete {file}: {str(e)}") | |
| self.temp_dir.rmdir() | |
| except Exception as e: | |
| self.logger.error(f"Cleanup failed: {str(e)}") | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| self.cleanup() | |
| # Streamlit UI Class | |
| class VideoGeneratorUI: | |
| def __init__(self): | |
| self.generator = EnhancedVideoGenerator() | |
| self.setup_ui() | |
| def setup_ui(self): | |
| st.set_page_config(layout="wide") | |
| # Initialize session state | |
| if 'processing_complete' not in st.session_state: | |
| st.session_state.processing_complete = False | |
| if 'current_step' not in st.session_state: | |
| st.session_state.current_step = 'input' | |
| st.title("VaultGenix Video Generator") | |
| st.markdown("Create professional videos for your digital legacy management platform") | |
| # Add sidebar for advanced settings | |
| with st.sidebar: | |
| st.subheader("Advanced Settings") | |
| st.session_state.enable_transitions = st.checkbox("Enable Transitions", value=True) | |
| st.session_state.enable_captions = st.checkbox("Enable Captions", value=True) | |
| st.session_state.high_quality = st.checkbox("High Quality Export", value=True) | |
| # Main content area | |
| with st.container(): | |
| if st.session_state.current_step == 'input': | |
| self.show_input_form() | |
| elif st.session_state.current_step == 'video_generation': | |
| self.show_video_generation() | |
| def show_input_form(self): | |
| with st.form(key='prompt_form'): | |
| prompt = st.text_area("Enter your video script", height=200) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.session_state.video_style = st.selectbox( | |
| "Choose style", | |
| ["Professional", "Creative", "Educational"] | |
| ) | |
| with col2: | |
| st.session_state.voice_style = st.selectbox( | |
| "Choose voice style", | |
| ["Professional Male", "Professional Female", "Casual Male", "Casual Female"] | |
| ) | |
| submit_button = st.form_submit_button(label='Generate Video') | |
| # Add this in the show_input_form method | |
| if len(prompt) > 5000: # Adjust this limit as needed | |
| st.error("Script is too long. Please limit to 5000 characters.") | |
| return | |
| if submit_button and prompt: | |
| st.session_state.prompt = prompt | |
| # Automatically select images based on AI analysis | |
| with st.spinner("AI analyzing script and selecting relevant images..."): | |
| self.auto_select_images() | |
| st.session_state.current_step = 'video_generation' | |
| st.rerun() | |
| def auto_select_images(self): | |
| """Automatically select the most relevant images based on AI analysis""" | |
| try: | |
| keywords = self.generator.image_scraper.extract_key_topics(st.session_state.prompt) | |
| st.write("🤖 AI-detected keywords:", ", ".join(keywords)) | |
| image_categories = self.generator.image_scraper.get_images(st.session_state.prompt) | |
| selected_images = [] | |
| if image_categories and isinstance(image_categories, dict): | |
| # Select top images from each category based on relevance score | |
| for category, images in image_categories.items(): | |
| # Sort images by relevance score | |
| sorted_images = sorted(images, key=lambda x: x.get('relevance_score', 0), reverse=True) | |
| # Select top images from each category | |
| num_to_select = { | |
| 'primary': 3, | |
| 'secondary': 2, | |
| 'general': 1 | |
| }.get(category.lower(), 1) | |
| selected = [img['url'] for img in sorted_images[:num_to_select]] | |
| selected_images.extend(selected) | |
| if not selected_images: | |
| # Fallback to stock images if no images were selected | |
| selected_images = self.generator.image_scraper.get_stock_images()[:6] | |
| st.session_state.selected_images = selected_images | |
| except Exception as e: | |
| st.error(f"Error in image selection: {str(e)}") | |
| # Fallback to stock images | |
| st.session_state.selected_images = self.generator.image_scraper.get_stock_images()[:6] | |
| def show_image_selection(self): | |
| st.subheader("AI-Selected Images for Your Video") | |
| # Display back button | |
| if st.button("← Back to Script"): | |
| st.session_state.current_step = 'input' | |
| st.rerun() | |
| try: | |
| with st.spinner("AI analyzing script and selecting relevant images..."): | |
| keywords = self.generator.image_scraper.extract_key_topics(st.session_state.prompt) | |
| st.write("🤖 AI-detected keywords:", ", ".join(keywords)) | |
| image_categories = self.generator.image_scraper.get_images(st.session_state.prompt) | |
| if image_categories and isinstance(image_categories, dict): | |
| # Create columns for image selection | |
| col1, col2 = st.columns([3, 1]) | |
| with col1: | |
| selected_images = [] | |
| for category, images in image_categories.items(): | |
| if images: | |
| st.subheader(f"{category.title()} Images") | |
| selected = self.display_image_grid(images) | |
| selected_images.extend(selected) | |
| # Update session state with selected images | |
| st.session_state.selected_images = selected_images | |
| with col2: | |
| # Show selection summary | |
| st.subheader("Selection Summary") | |
| selected_count = len(st.session_state.selected_images) | |
| st.write(f"Selected: {selected_count} images") | |
| if selected_count > 0: | |
| if st.button("Continue to Video Generation →", type="primary"): | |
| st.session_state.current_step = 'video_generation' | |
| st.rerun() | |
| else: | |
| st.warning("Please select at least one image") | |
| else: | |
| st.warning("No images found. Please try a different prompt.") | |
| except Exception as e: | |
| st.error(f"An error occurred: {str(e)}") | |
| print(f"Error in image selection: {str(e)}") | |
| def display_image_grid(self, images: List[Dict[str, str]], cols: int = 3) -> List[str]: | |
| """Display images in a grid and return selected image URLs""" | |
| selected_urls = [] | |
| # Initialize session state for image selection if not exists | |
| if 'image_selections' not in st.session_state: | |
| st.session_state.image_selections = {} | |
| n_images = len(images) | |
| n_rows = (n_images + cols - 1) // cols | |
| for row in range(n_rows): | |
| with st.container(): | |
| columns = st.columns(cols) | |
| for col in range(cols): | |
| idx = row * cols + col | |
| if idx < n_images: | |
| img = images[idx] | |
| with columns[col]: | |
| try: | |
| st.image(img['url'], use_container_width=True) | |
| # Create unique key for each checkbox | |
| checkbox_key = f"img_select_{row}_{col}_{hash(img['url'])}" | |
| # Pre-select images based on AI confidence score | |
| default_selected = img.get('relevance_score', 0) > 0.6 | |
| # Initialize checkbox state if not exists | |
| if checkbox_key not in st.session_state.image_selections: | |
| st.session_state.image_selections[checkbox_key] = default_selected | |
| # Create checkbox with persistent state | |
| if st.checkbox( | |
| f"Select (AI Confidence: {img.get('relevance_score', 0)*100:.1f}%)", | |
| key=checkbox_key, | |
| value=st.session_state.image_selections[checkbox_key] | |
| ): | |
| selected_urls.append(img['url']) | |
| # Show image metadata | |
| st.markdown( | |
| f"""<div style='font-size: 0.8em;'> | |
| Keywords: {img.get('keyword', 'N/A')}<br> | |
| Category: {img.get('category', 'General')} | |
| </div>""", | |
| unsafe_allow_html=True | |
| ) | |
| except Exception as e: | |
| st.error(f"Error displaying image: {str(e)}") | |
| return selected_urls | |
| def show_video_generation(self): | |
| st.subheader("Video Generation Settings") | |
| # Display back button | |
| if st.button("← Back to Image Selection"): | |
| st.session_state.current_step = 'image_selection' | |
| st.rerun() | |
| # Show selected images preview | |
| st.write("Selected Images:") | |
| cols = st.columns(4) | |
| for idx, img_url in enumerate(st.session_state.selected_images): | |
| with cols[idx % 4]: | |
| st.image(img_url, width=150) | |
| # Video settings | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| duration = st.slider( | |
| "Video duration (seconds)", | |
| min_value=30, | |
| max_value=180, | |
| value=60, | |
| step=30 | |
| ) | |
| background_music = st.selectbox( | |
| "Background Music", | |
| ["None", "Corporate", "Upbeat", "Inspirational", "Technology"] | |
| ) | |
| with col2: | |
| transition_style = st.selectbox( | |
| "Transition Style", | |
| ["Fade", "Slide", "Zoom", "None"] | |
| ) | |
| music_volume = st.slider( | |
| "Music Volume", | |
| min_value=0.0, | |
| max_value=1.0, | |
| value=0.3, | |
| step=0.1 | |
| ) | |
| # Advanced video effects | |
| with st.expander("Advanced Video Effects"): | |
| effect_col1, effect_col2 = st.columns(2) | |
| with effect_col1: | |
| zoom = st.slider( | |
| "Zoom Effect", | |
| min_value=1.0, | |
| max_value=1.5, | |
| value=1.0, | |
| step=0.1 | |
| ) | |
| brightness = st.slider( | |
| "Brightness", | |
| min_value=0.5, | |
| max_value=1.5, | |
| value=1.0, | |
| step=0.1 | |
| ) | |
| with effect_col2: | |
| contrast = st.slider( | |
| "Contrast", | |
| min_value=0.5, | |
| max_value=1.5, | |
| value=1.0, | |
| step=0.1 | |
| ) | |
| enable_blur = st.checkbox( | |
| "Enable Blur Effect", | |
| value=False | |
| ) | |
| # Add state management for video generation | |
| if 'video_generation_started' not in st.session_state: | |
| st.session_state.video_generation_started = False | |
| if 'video_path' not in st.session_state: | |
| st.session_state.video_path = None | |
| # Generate video button | |
| if st.button("🎬 Generate Video", type="primary") or st.session_state.video_generation_started: | |
| st.session_state.video_generation_started = True | |
| if not st.session_state.selected_images: | |
| st.error("No images selected. Please go back and select images.") | |
| st.session_state.video_generation_started = False | |
| return | |
| try: | |
| # Create progress containers | |
| progress_text = st.empty() | |
| progress_bar = st.progress(0) | |
| # Define video effects | |
| video_effects = { | |
| 'zoom': zoom, | |
| 'brightness': brightness, | |
| 'contrast': contrast, | |
| 'blur': enable_blur, | |
| 'transition_style': transition_style, | |
| 'subtitle_style': 'Modern', # Default value | |
| 'caption_position': 'Bottom', # Default value | |
| 'background_music': background_music, | |
| 'music_volume': music_volume | |
| } | |
| # Set up output path | |
| output_dir = "temp_videos" | |
| os.makedirs(output_dir, exist_ok=True) | |
| output_path = os.path.join(output_dir, f"vaultgenix_video_{int(time.time())}.mp4") | |
| # Progress callback | |
| def progress_callback(progress): | |
| progress_bar.progress(int(progress)) | |
| progress_text.text(f"Generating video: {int(progress)}% complete") | |
| time.sleep(0.1) # Add small delay to prevent UI freezing | |
| # Generate video with error handling | |
| if not st.session_state.video_path: | |
| for progress in range(0, 101, 5): | |
| progress_callback(progress) | |
| if progress == 25: # The point where it was failing before | |
| time.sleep(1) # Add extra delay at critical point | |
| if progress == 100: | |
| video_path = self.generator.create_video( | |
| st.session_state.prompt, | |
| st.session_state.video_style, | |
| duration, | |
| output_path, | |
| st.session_state.selected_images, | |
| video_effects | |
| ) | |
| st.session_state.video_path = video_path | |
| # Display video after generation | |
| if st.session_state.video_path and os.path.exists(st.session_state.video_path): | |
| st.success("✨ Video generated successfully!") | |
| # Display video | |
| with open(st.session_state.video_path, 'rb') as video_file: | |
| video_bytes = video_file.read() | |
| st.video(video_bytes) | |
| # Download options | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.download_button( | |
| label="⬇️ Download Video", | |
| data=video_bytes, | |
| file_name=os.path.basename(st.session_state.video_path), | |
| mime="video/mp4" | |
| ) | |
| with col2: | |
| if st.session_state.high_quality: | |
| st.download_button( | |
| label="⬇️ Download High Quality", | |
| data=video_bytes, | |
| file_name=f"high_quality_{os.path.basename(st.session_state.video_path)}", | |
| mime="video/mp4" | |
| ) | |
| # Reset generation state | |
| if st.button("Generate Another Video"): | |
| st.session_state.video_generation_started = False | |
| st.session_state.video_path = None | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Error generating video: {str(e)}") | |
| logging.error(f"Video generation error: {str(e)}") | |
| st.session_state.video_generation_started = False | |
| st.session_state.video_path = None | |
| if __name__ == "__main__": | |
| ui = VideoGeneratorUI() |