Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| from pathlib import Path | |
| import torch | |
| from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer | |
| from PIL import Image, ImageDraw, ImageFont | |
| import tempfile | |
| import os | |
| from moviepy.editor import * | |
| import numpy as np | |
| from gtts import gTTS | |
| import textwrap | |
| from concurrent.futures import ThreadPoolExecutor | |
| import io | |
| import unicodedata | |
| import re | |
| import requests | |
| import random | |
| import logging | |
| import time | |
| from typing import Optional, List, Dict, Tuple | |
| from bs4 import BeautifulSoup | |
| import requests | |
| from io import BytesIO | |
| import docx | |
| import PyPDF2 | |
| import pptx | |
| import cv2 | |
| from PIL import ImageEnhance | |
| class FileProcessor: | |
| def read_txt(file): | |
| return file.read().decode('utf-8') | |
| def read_pdf(file): | |
| pdf_reader = PyPDF2.PdfReader(file) | |
| text = "" | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() + "\n" | |
| return text | |
| def read_docx(file): | |
| doc = docx.Document(file) | |
| text = "" | |
| for para in doc.paragraphs: | |
| text += para.text + "\n" | |
| return text | |
| def read_pptx(file): | |
| prs = pptx.Presentation(file) | |
| text = "" | |
| for slide in prs.slides: | |
| for shape in slide.shapes: | |
| if hasattr(shape, "text"): | |
| text += shape.text + "\n" | |
| return text | |
| class ImageScraper: | |
| def __init__(self): | |
| self.PIXABAY_API_KEY = "48069976-37e20099248207cee12385560" | |
| self.headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
| } | |
| self.temp_dir = Path(tempfile.mkdtemp()) | |
| # Initialize keyword extractor model | |
| try: | |
| self.keyword_model = pipeline( | |
| "text-classification", | |
| model="facebook/bart-large-mnli", | |
| device=0 if torch.cuda.is_available() else -1 | |
| ) | |
| except Exception as e: | |
| print(f"Failed to load keyword model: {e}") | |
| self.keyword_model = None | |
| def extract_keywords(self, text: str) -> List[Dict[str, str]]: | |
| """Extract relevant keywords and categories from text using AI""" | |
| keywords = [] | |
| try: | |
| # Define candidate labels for classification | |
| candidate_labels = [ | |
| "technology", "science", "education", "business", | |
| "health", "nature", "people", "urban", "abstract", | |
| "sports", "food", "travel", "architecture", "art", | |
| "music", "fashion", "medical", "industrial", "space", | |
| "environmental", "historical", "cultural", "professional" | |
| ] | |
| # Use model to classify text against each label | |
| if self.keyword_model: | |
| results = self.keyword_model(text, candidate_labels, multi_label=True) | |
| # Filter results with high confidence | |
| for score, label in zip(results['scores'], results['labels']): | |
| if score > 0.3: # Confidence threshold | |
| keywords.append({ | |
| 'keyword': label, | |
| 'confidence': score, | |
| 'category': self.categorize_keyword(label) | |
| }) | |
| # Extract additional keywords using NLP | |
| additional_keywords = self.extract_noun_phrases(text) | |
| for keyword in additional_keywords: | |
| keywords.append({ | |
| 'keyword': keyword, | |
| 'confidence': 0.5, | |
| 'category': 'content_specific' | |
| }) | |
| # Sort by confidence | |
| keywords = sorted(keywords, key=lambda x: x['confidence'], reverse=True) | |
| return keywords | |
| except Exception as e: | |
| print(f"Keyword extraction error: {e}") | |
| return self.get_fallback_keywords() | |
| def extract_noun_phrases(self, text: str) -> List[str]: | |
| """Extract important noun phrases from text""" | |
| words = text.lower().split() | |
| phrases = [] | |
| # Common adjectives that might indicate important concepts | |
| adjectives = {'digital', 'smart', 'modern', 'advanced', 'innovative', | |
| 'technical', 'professional', 'creative', 'strategic'} | |
| for i in range(len(words)-1): | |
| if words[i] in adjectives: | |
| phrases.append(f"{words[i]} {words[i+1]}") | |
| return list(set(phrases)) | |
| def categorize_keyword(self, keyword: str) -> str: | |
| """Categorize keyword into general themes""" | |
| categories = { | |
| 'technical': {'technology', 'digital', 'software', 'computer', 'cyber'}, | |
| 'scientific': {'science', 'research', 'laboratory', 'experiment'}, | |
| 'business': {'business', 'professional', 'corporate', 'office'}, | |
| 'educational': {'education', 'learning', 'teaching', 'academic'}, | |
| 'creative': {'art', 'design', 'creative', 'innovation'}, | |
| } | |
| for category, terms in categories.items(): | |
| if any(term in keyword.lower() for term in terms): | |
| return category | |
| return 'general' | |
| def extract_key_topics(self, script: str) -> List[str]: | |
| """Extract key topics from a long text prompt with improved accuracy""" | |
| try: | |
| # Define relevant categories for VaultGenix | |
| categories = { | |
| 'security': ['security', 'encryption', 'protection', 'privacy', 'safe', 'secure'], | |
| 'digital': ['digital', 'online', 'virtual', 'cyber', 'electronic'], | |
| 'legacy': ['legacy', 'inheritance', 'heir', 'posthumous', 'estate'], | |
| 'management': ['management', 'planning', 'organization', 'control', 'administration'], | |
| 'technology': ['AI', 'artificial intelligence', 'technology', 'platform', 'system'], | |
| 'family': ['family', 'heir', 'custodian', 'relative', 'loved ones'] | |
| } | |
| # Process text | |
| text = script.lower() | |
| found_topics = set() | |
| # Extract single-word matches | |
| words = text.split() | |
| for category, terms in categories.items(): | |
| for term in terms: | |
| if term in text: | |
| found_topics.add(term) | |
| found_topics.add(category) | |
| # Extract meaningful phrases | |
| important_phrases = [ | |
| 'digital legacy', | |
| 'legacy management', | |
| 'digital security', | |
| 'data protection', | |
| 'artificial intelligence', | |
| 'digital estate', | |
| 'digital identity', | |
| 'secure platform', | |
| 'family protection', | |
| 'digital inheritance' | |
| ] | |
| for phrase in important_phrases: | |
| if phrase in text: | |
| found_topics.add(phrase) | |
| # Combine related topics | |
| combined_topics = [] | |
| for topic in found_topics: | |
| # Create meaningful combinations | |
| if topic in ['digital', 'secure', 'smart', 'AI']: | |
| related = ['legacy', 'security', 'protection', 'management'] | |
| for rel in related: | |
| if rel in found_topics: | |
| combined_topics.append(f"{topic} {rel}") | |
| # Add combined topics to results | |
| found_topics.update(combined_topics) | |
| # Prioritize topics | |
| priority_topics = [ | |
| topic for topic in found_topics | |
| if any(key in topic for key in ['digital', 'security', 'legacy', 'AI']) | |
| ] | |
| # Ensure we have enough topics | |
| if len(priority_topics) < 3: | |
| priority_topics.extend(['digital security', 'legacy management', 'data protection'][:3 - len(priority_topics)]) | |
| return list(set(priority_topics))[:5] # Return top 5 unique topics | |
| except Exception as e: | |
| print(f"Topic extraction error: {e}") | |
| return ['digital security', 'legacy management', 'data protection'] | |
| def get_images_for_keyword(self, keyword: str) -> List[Dict[str, str]]: | |
| """Get images for a specific keyword with improved relevance""" | |
| try: | |
| # Enhance keyword for better search results | |
| enhanced_keywords = { | |
| 'digital': 'digital technology security', | |
| 'security': 'cybersecurity protection', | |
| 'legacy': 'digital legacy inheritance', | |
| 'management': 'digital management system', | |
| 'AI': 'artificial intelligence technology', | |
| 'protection': 'data protection security' | |
| } | |
| search_term = enhanced_keywords.get(keyword, keyword) | |
| base_url = "https://pixabay.com/api/" | |
| params = { | |
| 'key': self.PIXABAY_API_KEY, | |
| 'q': search_term, | |
| 'image_type': 'photo', | |
| 'per_page': 5, | |
| 'safesearch': True, | |
| 'lang': 'en', | |
| 'category': 'technology', # Focus on technology category | |
| 'orientation': 'horizontal' # Better for video | |
| } | |
| response = requests.get(base_url, params=params, headers=self.headers) | |
| if response.status_code == 200: | |
| data = response.json() | |
| if 'hits' in data and data['hits']: | |
| return [{ | |
| 'url': img['largeImageURL'], | |
| 'keyword': keyword, | |
| 'relevance': 'Primary match' if keyword.lower() in img['tags'].lower() else 'Related', | |
| 'tags': img['tags'] | |
| } for img in data['hits']] | |
| return [] | |
| except Exception as e: | |
| print(f"Error fetching images for keyword {keyword}: {e}") | |
| return [] | |
| def get_pixabay_images(self, query: str) -> List[str]: | |
| """Get images from Pixabay API with enhanced error handling""" | |
| try: | |
| # Clean and encode the query | |
| clean_query = query.replace(' ', '+').strip() | |
| base_url = "https://pixabay.com/api/" | |
| params = { | |
| 'key': self.PIXABAY_API_KEY, | |
| 'q': clean_query, | |
| 'image_type': 'photo', | |
| 'per_page': 20, | |
| 'safesearch': True, | |
| 'lang': 'en' | |
| } | |
| response = requests.get(base_url, params=params, headers=self.headers) | |
| # Debug logging | |
| print(f"Pixabay API URL: {response.url}") | |
| print(f"Response status: {response.status_code}") | |
| if response.status_code == 200: | |
| data = response.json() | |
| print(f"Total hits: {data.get('totalHits', 0)}") | |
| if 'hits' in data and data['hits']: | |
| image_urls = [img['largeImageURL'] for img in data['hits']] | |
| print(f"Found {len(image_urls)} images") | |
| return image_urls | |
| else: | |
| print("No images found in response") | |
| return self.get_stock_images() | |
| else: | |
| print(f"Pixabay API error: Status code {response.status_code}") | |
| return self.get_stock_images() | |
| except Exception as e: | |
| print(f"Exception in get_pixabay_images: {str(e)}") | |
| return self.get_stock_images() | |
| def get_stock_images(self) -> List[str]: | |
| """Return preset stock images as fallback""" | |
| return [ | |
| "https://images.pexels.com/photos/60504/security-protection-anti-virus-software-60504.jpeg", | |
| "https://images.pexels.com/photos/5380642/pexels-photo-5380642.jpeg", | |
| "https://images.pexels.com/photos/2582937/pexels-photo-2582937.jpeg", | |
| "https://images.pexels.com/photos/7319074/pexels-photo-7319074.jpeg", | |
| "https://images.pexels.com/photos/4164418/pexels-photo-4164418.jpeg", | |
| "https://images.pexels.com/photos/3861969/pexels-photo-3861969.jpeg", | |
| "https://images.pexels.com/photos/5473298/pexels-photo-5473298.jpeg", | |
| "https://images.pexels.com/photos/4348401/pexels-photo-4348401.jpeg", | |
| "https://images.pexels.com/photos/8386440/pexels-photo-8386440.jpeg", | |
| "https://images.pexels.com/photos/5473950/pexels-photo-5473950.jpeg" | |
| ] | |
| def get_images(self, query: str, num_images: int = 15) -> Dict[str, List[Dict[str, str]]]: | |
| """Get images with AI-driven selection and ranking""" | |
| try: | |
| # Initialize result structure | |
| result = { | |
| 'primary': [], | |
| 'secondary': [], | |
| 'general': [] | |
| } | |
| # Extract and analyze keywords using AI | |
| keywords = self.extract_key_topics(query) | |
| print(f"AI extracted keywords: {keywords}") | |
| # Score and rank keywords based on relevance to query | |
| keyword_scores = self.score_keywords(query, keywords) | |
| ranked_keywords = sorted(keyword_scores.items(), key=lambda x: x[1], reverse=True) | |
| # Fetch and analyze images for each keyword | |
| all_images = [] | |
| for keyword, score in ranked_keywords: | |
| images = self.get_images_for_keyword(keyword) | |
| for img in images: | |
| img['relevance_score'] = score * self.analyze_image_relevance(img, query) | |
| all_images.append(img) | |
| # Sort images by relevance score | |
| sorted_images = sorted(all_images, key=lambda x: x['relevance_score'], reverse=True) | |
| # Distribute images across categories | |
| total_images = min(len(sorted_images), num_images) | |
| primary_count = total_images // 2 | |
| secondary_count = total_images // 3 | |
| result['primary'] = sorted_images[:primary_count] | |
| result['secondary'] = sorted_images[primary_count:primary_count + secondary_count] | |
| result['general'] = sorted_images[primary_count + secondary_count:total_images] | |
| # If no images found, use stock images | |
| if not any(result.values()): | |
| stock_images = self.get_stock_images() | |
| result['general'] = [{ | |
| 'url': url, | |
| 'keyword': 'technology', | |
| 'relevance': 'Fallback', | |
| 'tags': 'technology', | |
| 'relevance_score': 0.5 | |
| } for url in stock_images[:num_images]] | |
| return result | |
| except Exception as e: | |
| print(f"Error in get_images: {str(e)}") | |
| return self.get_fallback_images(num_images) | |
| def score_keywords(self, query: str, keywords: List[str]) -> Dict[str, float]: | |
| """Score keywords based on relevance to query""" | |
| scores = {} | |
| query_words = set(query.lower().split()) | |
| for keyword in keywords: | |
| score = 0.0 | |
| keyword_words = set(keyword.lower().split()) | |
| # Direct word match | |
| word_matches = len(keyword_words.intersection(query_words)) | |
| score += word_matches * 0.3 | |
| # Contextual relevance | |
| context_terms = { | |
| 'digital': 0.8, | |
| 'security': 0.7, | |
| 'legacy': 0.9, | |
| 'protection': 0.6, | |
| 'management': 0.5, | |
| 'AI': 0.8, | |
| 'technology': 0.6 | |
| } | |
| for term, weight in context_terms.items(): | |
| if term in keyword.lower(): | |
| score += weight | |
| scores[keyword] = min(score, 1.0) # Normalize to 0-1 | |
| return scores | |
| def analyze_image_relevance(self, image: Dict[str, str], query: str) -> float: | |
| """Analyze image relevance based on tags and metadata""" | |
| score = 0.0 | |
| # Analyze tags | |
| tags = set(image['tags'].lower().split(',')) | |
| query_words = set(query.lower().split()) | |
| # Tag matching | |
| matching_tags = len(tags.intersection(query_words)) | |
| score += matching_tags * 0.2 | |
| # Context relevance | |
| relevant_terms = { | |
| 'technology': 0.3, | |
| 'digital': 0.3, | |
| 'security': 0.3, | |
| 'business': 0.2, | |
| 'professional': 0.2, | |
| 'modern': 0.1 | |
| } | |
| for term, weight in relevant_terms.items(): | |
| if term in tags: | |
| score += weight | |
| return min(score, 1.0) # Normalize to 0-1 | |
| def get_fallback_keywords(self) -> List[Dict[str, str]]: | |
| """Return fallback keywords if AI extraction fails""" | |
| return [ | |
| {'keyword': 'technology', 'confidence': 1.0, 'category': 'technical'}, | |
| {'keyword': 'business', 'confidence': 0.8, 'category': 'business'}, | |
| {'keyword': 'professional', 'confidence': 0.8, 'category': 'business'}, | |
| {'keyword': 'digital', 'confidence': 0.7, 'category': 'technical'} | |
| ] | |
| def verify_image_url(self, url: str) -> bool: | |
| """Verify if an image URL is accessible""" | |
| try: | |
| response = requests.head(url, timeout=5) | |
| return response.status_code == 200 | |
| except: | |
| return False | |
| def generate_fallback_audio(self, script: str) -> AudioFileClip: | |
| """Generate fallback audio using gTTS""" | |
| try: | |
| audio_path = self.temp_dir / "voice.mp3" | |
| tts = gTTS(text=script, lang='en', slow=False) | |
| tts.save(str(audio_path)) | |
| return AudioFileClip(str(audio_path)) | |
| except Exception as e: | |
| print(f"Fallback audio generation failed: {e}") | |
| duration = len(script.split()) * 0.3 | |
| return AudioFileClip(duration=duration) | |
| def scrape_pexels(self, query: str) -> List[str]: | |
| urls = [] | |
| try: | |
| url = f"https://www.pexels.com/search/{query.replace(' ', '%20')}/" | |
| response = requests.get(url, headers=self.headers) | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # Updated selector to target image sources | |
| for img in soup.find_all('img', {'data-image-width': True}): | |
| if img.get('src') and 'photos' in img['src']: | |
| urls.append(img['src']) | |
| except Exception as e: | |
| print(f"Pexels scraping error: {e}") | |
| return urls | |
| def scrape_unsplash(self, query: str) -> List[str]: | |
| urls = [] | |
| try: | |
| url = f"https://unsplash.com/s/photos/{query.replace(' ', '-')}" | |
| response = requests.get(url, headers=self.headers) | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # Updated selector for Unsplash | |
| for img in soup.find_all('img', {'srcset': True}): | |
| src = img.get('src') | |
| if src and 'images.unsplash.com' in src: | |
| urls.append(src) | |
| except Exception as e: | |
| print(f"Unsplash scraping error: {e}") | |
| return urls | |
| class EnhancedVideoGenerator: | |
| def __init__(self): | |
| try: | |
| self.setup_logging() | |
| self.setup_device() | |
| self.initialize_models() | |
| self.setup_workspace() | |
| self.load_assets() | |
| self.setup_themes() | |
| self.image_scraper = ImageScraper() | |
| except Exception as e: | |
| logging.error(f"Initialization failed: {str(e)}") | |
| raise RuntimeError("Failed to initialize video generator") | |
| self.ELEVEN_LABS_API_KEY = "sk_acdad9d2d82d504bddbe5ed4aa290ca772c106aed5b128ba" # Replace with your key | |
| def setup_logging(self): | |
| """Configure logging for the application""" | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler('video_generator.log'), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| self.logger = logging.getLogger(__name__) | |
| def setup_device(self): | |
| """Set up computing device (CPU/GPU)""" | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| self.logger.info(f"Using device: {self.device}") | |
| def initialize_models(self): | |
| """Initialize all AI models""" | |
| try: | |
| # Text generation model initialization with error handling | |
| try: | |
| self.text_generator = pipeline( | |
| 'text-generation', | |
| model='gpt2', | |
| device=0 if self.device == "cuda" else -1 | |
| ) | |
| except Exception as e: | |
| self.logger.warning(f"Text generator initialization failed: {str(e)}") | |
| self.text_generator = None | |
| # Skip the StableDiffusion model initialization as it requires additional setup | |
| self.image_model = None | |
| # Initialize stability API attribute | |
| self.stability_api = None | |
| except Exception as e: | |
| self.logger.error(f"Model initialization failed: {str(e)}") | |
| # Don't raise exception, allow initialization with degraded functionality | |
| pass | |
| def setup_workspace(self): | |
| """Set up working directory and resources""" | |
| self.temp_dir = Path(tempfile.mkdtemp()) | |
| self.asset_dir = self.temp_dir / "assets" | |
| self.asset_dir.mkdir(exist_ok=True) | |
| def setup_themes(self): | |
| """Set up visual themes""" | |
| self.themes = { | |
| 'Professional': { | |
| 'bg': (240, 240, 240), | |
| 'accent': (0, 120, 212), | |
| 'text': (33, 33, 33) | |
| }, | |
| 'Creative': { | |
| 'bg': (255, 250, 240), | |
| 'accent': (255, 123, 0), | |
| 'text': (51, 51, 51) | |
| }, | |
| 'Educational': { | |
| 'bg': (248, 249, 250), | |
| 'accent': (40, 167, 69), | |
| 'text': (33, 37, 41) | |
| } | |
| } | |
| def load_assets(self): | |
| """Load visual assets and fonts""" | |
| try: | |
| # Try multiple font options | |
| font_options = [ | |
| "arial.ttf", | |
| "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", | |
| "/System/Library/Fonts/Helvetica.ttc" | |
| ] | |
| for font_path in font_options: | |
| try: | |
| self.font = ImageFont.truetype(font_path, 40) | |
| break | |
| except OSError: | |
| continue | |
| else: | |
| self.font = ImageFont.load_default() | |
| self.logger.warning("Using default font - custom font loading failed") | |
| except Exception as e: | |
| self.logger.error(f"Asset loading failed: {str(e)}") | |
| def generate_visual_assets(self, script: str, style: str) -> List[Dict]: | |
| """Generate relevant visual assets based on script content""" | |
| try: | |
| # Extract key topics from script | |
| topics = self.extract_key_topics(script) | |
| assets = [] | |
| for topic in topics: | |
| # Generate AI image | |
| image = self.generate_ai_image(topic, style) | |
| if image: | |
| assets.append({ | |
| 'type': 'image', | |
| 'data': image, | |
| 'topic': topic | |
| }) | |
| return assets | |
| except Exception as e: | |
| self.logger.error(f"Visual asset generation failed: {str(e)}") | |
| return [] | |
| def create_enhanced_frame( | |
| self, | |
| text: str, | |
| theme: dict, | |
| frame_number: int, | |
| total_frames: int, | |
| background_image: Optional[Image.Image] = None, | |
| size: Tuple[int, int] = (1920, 1080) # Upgraded to 1080p | |
| ) -> np.ndarray: | |
| """Create a visually enhanced frame with background, text, and effects""" | |
| try: | |
| # Create base frame | |
| if background_image: | |
| # Resize and crop background to fit | |
| bg = background_image.resize(size, Image.LANCZOS) | |
| frame = np.array(bg) | |
| else: | |
| frame = np.full((size[1], size[0], 3), theme['bg'], dtype=np.uint8) | |
| # Convert to PIL Image for drawing | |
| img = Image.fromarray(frame) | |
| draw = ImageDraw.Draw(img, 'RGBA') | |
| # Add subtle gradient overlay | |
| overlay = Image.new('RGBA', size, (0, 0, 0, 0)) | |
| overlay_draw = ImageDraw.Draw(overlay) | |
| overlay_draw.rectangle( | |
| [0, 0, size[0], size[1]], | |
| fill=(255, 255, 255, 100) # Semi-transparent white | |
| ) | |
| img = Image.alpha_composite(img.convert('RGBA'), overlay) | |
| # Add text with improved styling | |
| text = self.clean_text(text) | |
| wrapped_text = textwrap.fill(text, width=50) | |
| # Calculate text position | |
| text_bbox = draw.textbbox((0, 0), wrapped_text, font=self.font) | |
| text_width = text_bbox[2] - text_bbox[0] | |
| text_height = text_bbox[3] - text_bbox[1] | |
| text_x = (size[0] - text_width) // 2 | |
| text_y = size[1] - text_height - 100 # Position at bottom | |
| # Draw text background | |
| padding = 20 | |
| draw.rectangle( | |
| [ | |
| text_x - padding, | |
| text_y - padding, | |
| text_x + text_width + padding, | |
| text_y + text_height + padding | |
| ], | |
| fill=(0, 0, 0, 160) # Semi-transparent black | |
| ) | |
| # Draw text | |
| draw.text( | |
| (text_x, text_y), | |
| wrapped_text, | |
| fill=(255, 255, 255, 255), | |
| font=self.font | |
| ) | |
| # Add progress bar with animation | |
| self.draw_animated_progress_bar( | |
| draw, | |
| frame_number, | |
| total_frames, | |
| size, | |
| theme | |
| ) | |
| return np.array(img) | |
| except Exception as e: | |
| self.logger.error(f"Frame creation failed: {str(e)}") | |
| # Return fallback frame | |
| return np.full((size[1], size[0], 3), theme['bg'], dtype=np.uint8) | |
| def draw_animated_progress_bar( | |
| self, | |
| draw: ImageDraw.Draw, | |
| frame_number: int, | |
| total_frames: int, | |
| size: Tuple[int, int], | |
| theme: dict | |
| ): | |
| """Draw an animated progress bar with effects""" | |
| try: | |
| progress = frame_number / total_frames | |
| bar_width = int(size[0] * 0.8) # 80% of screen width | |
| bar_height = 6 | |
| x_offset = (size[0] - bar_width) // 2 | |
| y_position = size[1] - 40 | |
| # Draw background bar | |
| draw.rectangle( | |
| [x_offset, y_position, x_offset + bar_width, y_position + bar_height], | |
| fill=(200, 200, 200, 160) | |
| ) | |
| # Draw progress with gradient effect | |
| progress_width = int(bar_width * progress) | |
| for x in range(progress_width): | |
| alpha = int(255 * (x / bar_width)) # Gradient effect | |
| draw.line( | |
| [x_offset + x, y_position, x_offset + x, y_position + bar_height], | |
| fill=(theme['accent'][0], theme['accent'][1], theme['accent'][2], alpha) | |
| ) | |
| # Add animated highlight | |
| highlight_pos = x_offset + progress_width | |
| if highlight_pos < x_offset + bar_width: | |
| draw.rectangle( | |
| [highlight_pos-2, y_position-1, highlight_pos+2, y_position + bar_height+1], | |
| fill=(255, 255, 255, 200) | |
| ) | |
| except Exception as e: | |
| self.logger.error(f"Progress bar drawing failed: {str(e)}") | |
| def generate_voice_over(self, script: str) -> AudioFileClip: | |
| try: | |
| # Try ElevenLabs first | |
| audio_path = self.temp_dir / "voice.mp3" | |
| headers = { | |
| "xi-api-key": self.ELEVEN_LABS_API_KEY, | |
| "Content-Type": "application/json" | |
| } | |
| data = { | |
| "text": script, | |
| "model_id": "eleven_monolingual_v1", | |
| "voice_settings": { | |
| "stability": 0.75, | |
| "similarity_boost": 0.75 | |
| } | |
| } | |
| response = requests.post( | |
| "https://api.elevenlabs.io/v1/text-to-speech/21m00Tcm4TlvDq8ikWAM", | |
| headers=headers, | |
| json=data | |
| ) | |
| if response.status_code == 200: | |
| with open(audio_path, "wb") as f: | |
| f.write(response.content) | |
| else: | |
| # Fallback to Azure TTS | |
| speech_config = speechsdk.SpeechConfig( | |
| subscription=self.AZURE_SPEECH_KEY, | |
| region=self.AZURE_REGION | |
| ) | |
| speech_config.speech_synthesis_voice_name = "en-US-JennyNeural" | |
| synthesizer = speechsdk.SpeechSynthesizer(speech_config=speech_config) | |
| result = synthesizer.speak_text_async(script).get() | |
| if result.reason == speechsdk.ResultReason.SynthesizingAudioCompleted: | |
| with open(audio_path, "wb") as f: | |
| f.write(result.audio_data) | |
| return AudioFileClip(str(audio_path)) | |
| except Exception as e: | |
| print(f"Voice generation error: {e}") | |
| return self.generate_fallback_audio(script) | |
| def generate_subtitles(self, script: str, duration: int) -> str: | |
| words = script.split() | |
| words_per_second = len(words) / duration | |
| subtitle_path = self.temp_dir / "subtitles.srt" | |
| with open(subtitle_path, 'w') as f: | |
| current_time = 0 | |
| words_per_subtitle = int(words_per_second * 3) # 3 seconds per subtitle | |
| for i in range(0, len(words), words_per_subtitle): | |
| subtitle_words = words[i:i + words_per_subtitle] | |
| if subtitle_words: | |
| start_time = self.format_time(current_time) | |
| current_time += len(subtitle_words) / words_per_second | |
| end_time = self.format_time(current_time) | |
| f.write(f"{i//words_per_subtitle + 1}\n") | |
| f.write(f"{start_time} --> {end_time}\n") | |
| f.write(f"{' '.join(subtitle_words)}\n\n") | |
| return str(subtitle_path) | |
| def format_time(seconds: float) -> str: | |
| hours = int(seconds // 3600) | |
| minutes = int((seconds % 3600) // 60) | |
| secs = int(seconds % 60) | |
| msecs = int((seconds - int(seconds)) * 1000) | |
| return f"{hours:02d}:{minutes:02d}:{secs:02d},{msecs:03d}" | |
| def create_video(self, script: str, style: str, duration: int, output_path: str, selected_images: List[str]) -> str: | |
| """Create video with enhanced features and proper error handling""" | |
| try: | |
| # Initialize progress tracking | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| # Create output directory if it doesn't exist | |
| os.makedirs(os.path.dirname(output_path), exist_ok=True) | |
| # Validate inputs and paths | |
| if not output_path: | |
| raise ValueError("Output path cannot be empty") | |
| if not selected_images: | |
| raise ValueError("No images selected") | |
| # Generate voice-over with progress tracking | |
| status_text.text("Creating voice-over...") | |
| audio = self.generate_voice_over(script) | |
| progress_bar.progress(20) | |
| # Process images with effects | |
| status_text.text("Processing images with effects...") | |
| processed_images = [] | |
| for img_url in selected_images: | |
| try: | |
| response = requests.get(img_url, timeout=10) | |
| response.raise_for_status() | |
| img = Image.open(BytesIO(response.content)) | |
| img = img.convert('RGB') | |
| # Apply image effects based on style | |
| if style == "Creative": | |
| # Add creative effects | |
| enhancer = ImageEnhance.Contrast(img) | |
| img = enhancer.enhance(1.2) | |
| enhancer = ImageEnhance.Brightness(img) | |
| img = enhancer.enhance(1.1) | |
| elif style == "Professional": | |
| # Add professional effects | |
| enhancer = ImageEnhance.Sharpness(img) | |
| img = enhancer.enhance(1.3) | |
| img = img.resize((1920, 1080), Image.Resampling.LANCZOS) | |
| processed_images.append(img) | |
| except Exception as e: | |
| print(f"Error processing image {img_url}: {e}") | |
| continue | |
| progress_bar.progress(40) | |
| # Generate frames with transitions | |
| status_text.text("Creating frames with transitions...") | |
| frames = [] | |
| fps = 30 | |
| total_frames = int(duration * fps) | |
| frames_per_image = total_frames // len(processed_images) | |
| # Convert images to numpy arrays | |
| image_arrays = [np.array(img) for img in processed_images] | |
| # Add transition effects | |
| frame_count = 0 | |
| for idx, img_array in enumerate(image_arrays): | |
| # Calculate frames for this image | |
| if idx == len(image_arrays) - 1: | |
| n_frames = total_frames - frame_count | |
| else: | |
| n_frames = min(frames_per_image, total_frames - frame_count) | |
| # Add effects and transitions | |
| for frame_idx in range(n_frames): | |
| # Apply fade in/out effect | |
| alpha = 1.0 | |
| if frame_idx < 15: # Fade in | |
| alpha = frame_idx / 15 | |
| elif frame_idx > n_frames - 15: # Fade out | |
| alpha = (n_frames - frame_idx) / 15 | |
| frame = img_array * alpha | |
| frames.append(frame.astype(np.uint8)) | |
| frame_count += 1 | |
| # Update progress | |
| progress = int(40 + (frame_count / total_frames * 30)) | |
| progress_bar.progress(progress) | |
| # Add transition to next image | |
| if idx < len(image_arrays) - 1: | |
| next_img_array = image_arrays[idx + 1] | |
| transition_frames = 15 | |
| for t in range(transition_frames): | |
| if frame_count < total_frames: | |
| alpha = t / transition_frames | |
| transition_frame = cv2.addWeighted( | |
| img_array, 1 - alpha, | |
| next_img_array, alpha, 0 | |
| ) | |
| frames.append(transition_frame) | |
| frame_count += 1 | |
| progress_bar.progress(70) | |
| # Create video with frames | |
| status_text.text("Compiling video...") | |
| clip = ImageSequenceClip(frames, fps=fps) | |
| # Add audio with proper synchronization | |
| audio_duration = audio.duration | |
| video_duration = len(frames) / fps | |
| if audio_duration > video_duration: | |
| audio = audio.subclip(0, video_duration) | |
| elif audio_duration < video_duration: | |
| clip = clip.subclip(0, audio_duration) | |
| final_clip = clip.set_audio(audio) | |
| # Write video with progress caching | |
| status_text.text("Saving video...") | |
| cache_dir = os.path.join(os.path.dirname(output_path), ".cache") | |
| os.makedirs(cache_dir, exist_ok=True) | |
| try: | |
| final_clip.write_videofile( | |
| output_path, | |
| fps=fps, | |
| codec='libx264', | |
| audio_codec='aac', | |
| ffmpeg_params=['-pix_fmt', 'yuv420p'], | |
| temp_audiofile=os.path.join(cache_dir, "temp-audio.m4a"), | |
| verbose=False, | |
| logger=None | |
| ) | |
| except Exception as e: | |
| # Attempt error recovery | |
| status_text.text("Attempting error recovery...") | |
| try: | |
| # Try alternative codec settings | |
| final_clip.write_videofile( | |
| output_path, | |
| fps=fps, | |
| codec='libx264', | |
| audio_codec='mp3', | |
| verbose=False, | |
| logger=None | |
| ) | |
| except Exception as recovery_e: | |
| raise RuntimeError(f"Video creation failed even with recovery attempt: {str(recovery_e)}") | |
| progress_bar.progress(100) | |
| status_text.text("Video generation complete!") | |
| return output_path | |
| except Exception as e: | |
| error_msg = f"Video creation failed: {str(e)}" | |
| print(error_msg) | |
| raise RuntimeError(error_msg) | |
| finally: | |
| # Cleanup | |
| try: | |
| if 'clip' in locals(): | |
| clip.close() | |
| if 'final_clip' in locals(): | |
| final_clip.close() | |
| if 'audio' in locals(): | |
| audio.close() | |
| except Exception as e: | |
| print(f"Cleanup error: {e}") | |
| def generate_visual_assets(self, script: str, style: str) -> List[Dict]: | |
| """Generate relevant visual assets based on script content""" | |
| try: | |
| # Simplified asset generation for faster processing | |
| topics = self.extract_key_topics(script)[:3] # Limit to 3 topics | |
| assets = [] | |
| for topic in topics: | |
| # Create simple colored backgrounds instead of AI images | |
| img = Image.new('RGB', (1920, 1080), self.themes[style]['bg']) | |
| assets.append({ | |
| 'type': 'image', | |
| 'data': img, | |
| 'topic': topic | |
| }) | |
| return assets | |
| except Exception as e: | |
| self.logger.error(f"Visual asset generation failed: {str(e)}") | |
| return [] | |
| def clean_text(text: str) -> str: | |
| """Clean and normalize text for display""" | |
| if not isinstance(text, str): | |
| text = str(text) | |
| # Normalize unicode characters | |
| text = unicodedata.normalize('NFKD', text) | |
| # Remove non-ASCII characters | |
| text = text.encode('ascii', 'ignore').decode('ascii') | |
| # Replace problematic characters | |
| replacements = { | |
| '–': '-', # en dash | |
| '—': '-', # em dash | |
| '"': '"', # smart quotes | |
| '"': '"', # smart quotes | |
| ''': "'", # smart apostrophe | |
| ''': "'", # smart apostrophe | |
| '…': '...', # ellipsis | |
| } | |
| for old, new in replacements.items(): | |
| text = text.replace(old, new) | |
| # Remove any remaining non-standard characters | |
| text = re.sub(r'[^\x00-\x7F]+', '', text) | |
| return text.strip() | |
| def generate_ai_image(self, prompt: str, style: str) -> Optional[Image.Image]: | |
| """Generate an AI image using Stability AI""" | |
| try: | |
| if not self.stability_api: | |
| return None | |
| # Enhance prompt based on style | |
| style_prompts = { | |
| 'Professional': "professional, corporate, clean, modern", | |
| 'Creative': "artistic, vibrant, innovative, dynamic", | |
| 'Educational': "clear, informative, academic, detailed" | |
| } | |
| enhanced_prompt = f"{prompt}, {style_prompts.get(style, '')}, high quality, 4k" | |
| # Generate image | |
| response = self.stability_api.generate( | |
| prompt=enhanced_prompt, | |
| samples=1, | |
| width=1920, | |
| height=1080 | |
| ) | |
| if response and len(response) > 0: | |
| image_data = response[0].image | |
| return Image.open(io.BytesIO(image_data)) | |
| return None | |
| except Exception as e: | |
| self.logger.error(f"AI image generation failed: {str(e)}") | |
| return None | |
| def cleanup(self): | |
| """Clean up temporary files and resources""" | |
| try: | |
| for file in self.temp_dir.glob('*'): | |
| try: | |
| if file.is_file(): | |
| file.unlink() | |
| elif file.is_dir(): | |
| import shutil | |
| shutil.rmtree(file) | |
| except Exception as e: | |
| self.logger.warning(f"Failed to delete {file}: {str(e)}") | |
| self.temp_dir.rmdir() | |
| except Exception as e: | |
| self.logger.error(f"Cleanup failed: {str(e)}") | |
| def __enter__(self): | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| self.cleanup() | |
| # Streamlit UI Class | |
| class VideoGeneratorUI: | |
| def __init__(self): | |
| self.generator = EnhancedVideoGenerator() | |
| self.setup_ui() | |
| def setup_ui(self): | |
| st.set_page_config(layout="wide") | |
| # Custom CSS | |
| st.markdown(""" | |
| <style> | |
| .stApp { | |
| max-width: 1200px; | |
| margin: 0 auto; | |
| } | |
| .image-category { | |
| margin-top: 2rem; | |
| padding: 1rem; | |
| border-radius: 0.5rem; | |
| background: #f8f9fa; | |
| } | |
| .image-metadata { | |
| font-size: 0.8rem; | |
| color: #666; | |
| margin-top: 0.5rem; | |
| } | |
| .submit-btn { | |
| margin-top: 1rem; | |
| padding: 0.5rem 1rem; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| st.title("VaultGenix Video Generator") | |
| st.markdown("Create professional videos for your digital legacy management platform") | |
| with st.container(): | |
| # Add form for prompt submission | |
| with st.form(key='prompt_form'): | |
| prompt = st.text_area("Enter your video script", height=200) | |
| submit_button = st.form_submit_button(label='Analyze Script & Find Images') | |
| if submit_button and prompt: | |
| # First show AI-selected images | |
| with st.spinner("AI analyzing script and selecting relevant images..."): | |
| try: | |
| # Get AI-selected images first | |
| keywords = self.generator.image_scraper.extract_key_topics(prompt) | |
| st.write("🤖 AI-detected keywords:", ", ".join(keywords)) | |
| image_categories = self.generator.image_scraper.get_images(prompt) | |
| # Store selections in session state | |
| if 'selected_images' not in st.session_state: | |
| st.session_state.selected_images = [] | |
| if image_categories and isinstance(image_categories, dict): | |
| # Display AI-selected primary matches first | |
| if 'primary' in image_categories and image_categories['primary']: | |
| st.subheader("🎯 AI-Selected Most Relevant Images") | |
| self.display_image_grid(image_categories['primary']) | |
| # Display secondary matches | |
| if 'secondary' in image_categories and image_categories['secondary']: | |
| st.subheader("🔄 AI-Selected Related Images") | |
| self.display_image_grid(image_categories['secondary']) | |
| # Collect selected images | |
| selected_images = [] | |
| for category in image_categories.values(): | |
| if isinstance(category, list): | |
| for img in category: | |
| key = f"img_{img['url']}" | |
| if st.session_state.get(key, False): | |
| selected_images.append(img['url']) | |
| st.session_state.selected_images = selected_images | |
| # Video generation section | |
| if selected_images: | |
| self.show_video_settings(prompt, selected_images) | |
| else: | |
| st.warning("Please select at least one image to generate the video.") | |
| else: | |
| st.warning("No images found. Please try a different prompt.") | |
| except Exception as e: | |
| st.error(f"An error occurred: {str(e)}") | |
| print(f"Error in UI: {str(e)}") | |
| def display_image_grid(self, images: List[Dict[str, str]], cols: int = 3): | |
| """Display images in a grid with metadata and confidence scores""" | |
| if not images or not isinstance(images, list): | |
| return | |
| n_images = len(images) | |
| n_rows = (n_images + cols - 1) // cols | |
| for row in range(n_rows): | |
| with st.container(): | |
| columns = st.columns(cols) | |
| for col in range(cols): | |
| idx = row * cols + col | |
| if idx < n_images: | |
| img = images[idx] | |
| with columns[col]: | |
| try: | |
| st.image(img['url'], use_container_width=True) | |
| # Add confidence score to checkbox label | |
| confidence = img.get('relevance_score', 0) * 100 | |
| checkbox_label = f"Select (AI Confidence: {confidence:.1f}%)" | |
| st.checkbox( | |
| checkbox_label, | |
| key=f"img_{img['url']}", | |
| help=f"Keywords: {img['keyword']}\nTags: {img['tags']}" | |
| ) | |
| # Show relevance metadata | |
| st.markdown( | |
| f"<div class='image-metadata'>" | |
| f"<b>AI Relevance:</b> {img['relevance']}<br>" | |
| f"<b>Keywords:</b> {img['keyword']}<br>" | |
| f"<b>Match Type:</b> {img.get('category', 'General')}" | |
| f"</div>", | |
| unsafe_allow_html=True | |
| ) | |
| except Exception as e: | |
| print(f"Error displaying image: {e}") | |
| def show_video_settings(self, prompt: str, selected_images: List[str]): | |
| """Show video generation settings and controls""" | |
| st.subheader("Video Settings") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| style = st.selectbox( | |
| "Choose style", | |
| options=["Professional", "Creative", "Educational"], | |
| index=0 | |
| ) | |
| with col2: | |
| duration = st.slider( | |
| "Video duration (seconds)", | |
| min_value=30, | |
| max_value=180, | |
| value=60, | |
| step=30 | |
| ) | |
| if st.button("🎬 Generate Video", type="primary"): | |
| if not selected_images: | |
| st.error("Please select at least one image before generating the video.") | |
| return | |
| try: | |
| output_dir = "temp_videos" | |
| os.makedirs(output_dir, exist_ok=True) | |
| output_path = os.path.join(output_dir, f"vaultgenix_video_{int(time.time())}.mp4") | |
| video_path = self.generator.create_video( | |
| prompt, | |
| style, | |
| duration, | |
| output_path, | |
| selected_images | |
| ) | |
| if os.path.exists(video_path): | |
| st.success("✨ Video generated successfully!") | |
| # Display video | |
| with open(video_path, 'rb') as video_file: | |
| video_bytes = video_file.read() | |
| st.video(video_bytes) | |
| # Download button | |
| st.download_button( | |
| label="⬇️ Download Video", | |
| data=video_bytes, | |
| file_name=os.path.basename(video_path), | |
| mime="video/mp4" | |
| ) | |
| else: | |
| st.error("Video generation failed. Please try again.") | |
| except Exception as e: | |
| st.error(f"Error generating video: {str(e)}") | |
| print(f"Video generation error: {str(e)}") # For debugging | |
| class VideoGenerator: | |
| def __init__(self): | |
| self.temp_dir = Path(tempfile.mkdtemp()) | |
| self.setup_resources() | |
| def setup_resources(self): | |
| # Initialize font | |
| try: | |
| font_options = [ | |
| "arial.ttf", | |
| "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", | |
| "/System/Library/Fonts/Helvetica.ttc" | |
| ] | |
| for font_path in font_options: | |
| try: | |
| self.font = ImageFont.truetype(font_path, 40) | |
| break | |
| except OSError: | |
| continue | |
| else: | |
| self.font = ImageFont.load_default() | |
| except Exception as e: | |
| print(f"Font loading error: {e}") | |
| self.font = ImageFont.load_default() | |
| def create_video_frame(self, image, text, frame_number, total_frames, size=(1920, 1080)): | |
| try: | |
| # Resize and pad image to maintain aspect ratio | |
| img_aspect = image.size[0] / image.size[1] | |
| target_aspect = size[0] / size[1] | |
| if img_aspect > target_aspect: | |
| new_height = size[1] | |
| new_width = int(new_height * img_aspect) | |
| else: | |
| new_width = size[0] | |
| new_height = int(new_width / img_aspect) | |
| image = image.resize((new_width, new_height), Image.LANCZOS) | |
| # Create new background | |
| frame = Image.new('RGB', size, (0, 0, 0)) | |
| # Paste resized image in center | |
| paste_x = (size[0] - new_width) // 2 | |
| paste_y = (size[1] - new_height) // 2 | |
| frame.paste(image, (paste_x, paste_y)) | |
| # Add text overlay | |
| draw = ImageDraw.Draw(frame) | |
| # Text background | |
| text = textwrap.fill(text, width=50) | |
| text_bbox = draw.textbbox((0, 0), text, font=self.font) | |
| text_width = text_bbox[2] - text_bbox[0] | |
| text_height = text_bbox[3] - text_bbox[1] | |
| text_x = (size[0] - text_width) // 2 | |
| text_y = size[1] - text_height - 100 | |
| # Semi-transparent background | |
| padding = 20 | |
| draw.rectangle( | |
| [ | |
| text_x - padding, | |
| text_y - padding, | |
| text_x + text_width + padding, | |
| text_y + text_height + padding | |
| ], | |
| fill=(0, 0, 0, 180) | |
| ) | |
| # Draw text | |
| draw.text((text_x, text_y), text, fill=(255, 255, 255), font=self.font) | |
| # Add progress bar | |
| self.draw_progress_bar(draw, frame_number, total_frames, size) | |
| return np.array(frame) | |
| except Exception as e: | |
| print(f"Frame creation error: {e}") | |
| return np.zeros((*size, 3), dtype=np.uint8) | |
| def draw_progress_bar(self, draw, frame_number, total_frames, size): | |
| progress = frame_number / total_frames | |
| bar_width = int(size[0] * 0.8) | |
| bar_height = 6 | |
| x_offset = (size[0] - bar_width) // 2 | |
| y_position = size[1] - 40 | |
| # Background bar | |
| draw.rectangle( | |
| [x_offset, y_position, x_offset + bar_width, y_position + bar_height], | |
| fill=(100, 100, 100, 160) | |
| ) | |
| # Progress bar | |
| progress_width = int(bar_width * progress) | |
| draw.rectangle( | |
| [x_offset, y_position, x_offset + progress_width, y_position + bar_height], | |
| fill=(255, 255, 255, 200) | |
| ) | |
| def generate_video(self, script: str, images: List[str], duration: int, output_path: str) -> str: | |
| try: | |
| # Create temporary directory for processing | |
| os.makedirs(os.path.dirname(output_path), exist_ok=True) | |
| # Process images | |
| processed_images = [] | |
| for img_url in images: | |
| try: | |
| response = requests.get(img_url) | |
| img = Image.open(BytesIO(response.content)).convert('RGB') | |
| processed_images.append(img) | |
| except Exception as e: | |
| print(f"Image processing error: {e}") | |
| continue | |
| if not processed_images: | |
| raise ValueError("No valid images to process") | |
| # Generate frames | |
| fps = 30 | |
| total_frames = duration * fps | |
| frames_per_image = total_frames // len(processed_images) | |
| # Split script into sections | |
| words = script.split() | |
| words_per_image = len(words) // len(processed_images) | |
| frames = [] | |
| frame_count = 0 | |
| # Generate video frames | |
| for idx, img in enumerate(processed_images): | |
| # Get text section for this image | |
| start_idx = idx * words_per_image | |
| end_idx = start_idx + words_per_image if idx < len(processed_images) - 1 else len(words) | |
| section_text = ' '.join(words[start_idx:end_idx]) | |
| # Generate frames for this section | |
| for frame in range(frames_per_image): | |
| if frame_count < total_frames: | |
| frame_img = self.create_video_frame( | |
| img, | |
| section_text, | |
| frame_count, | |
| total_frames | |
| ) | |
| frames.append(frame_img) | |
| frame_count += 1 | |
| # Add transition frames | |
| if idx < len(processed_images) - 1: | |
| next_img = processed_images[idx + 1] | |
| for t in range(15): # 15 frame transition | |
| if frame_count < total_frames: | |
| alpha = t / 15 | |
| transition_frame = Image.blend( | |
| img, | |
| next_img, | |
| alpha | |
| ) | |
| frame_img = self.create_video_frame( | |
| transition_frame, | |
| section_text, | |
| frame_count, | |
| total_frames | |
| ) | |
| frames.append(frame_img) | |
| frame_count += 1 | |
| # Generate audio | |
| audio_path = self.temp_dir / "audio.mp3" | |
| tts = gTTS(text=script, lang='en') | |
| tts.save(str(audio_path)) | |
| # Create video | |
| clip = ImageSequenceClip(frames, fps=fps) | |
| audio_clip = AudioFileClip(str(audio_path)) | |
| # Adjust video length to match audio | |
| if audio_clip.duration < clip.duration: | |
| clip = clip.subclip(0, audio_clip.duration) | |
| else: | |
| audio_clip = audio_clip.subclip(0, clip.duration) | |
| final_clip = clip.set_audio(audio_clip) | |
| # Write video | |
| final_clip.write_videofile( | |
| output_path, | |
| fps=fps, | |
| codec='libx264', | |
| audio_codec='aac', | |
| ffmpeg_params=['-pix_fmt', 'yuv420p'] | |
| ) | |
| return output_path | |
| except Exception as e: | |
| print(f"Video generation error: {e}") | |
| raise | |
| finally: | |
| # Cleanup | |
| try: | |
| if 'clip' in locals(): | |
| clip.close() | |
| if 'final_clip' in locals(): | |
| final_clip.close() | |
| if 'audio_clip' in locals(): | |
| audio_clip.close() | |
| except Exception as e: | |
| print(f"Cleanup error: {e}") | |
| def cleanup(self): | |
| try: | |
| import shutil | |
| shutil.rmtree(self.temp_dir) | |
| except Exception as e: | |
| print(f"Cleanup error: {e}") | |
| def create_ui(): | |
| st.title("VaultGenix Video Generator") | |
| st.markdown("Create professional videos for your digital legacy management platform") | |
| # File upload section | |
| st.subheader("Upload Content") | |
| uploaded_file = st.file_uploader( | |
| "Upload your content (PDF, DOCX, PPTX, or TXT)", | |
| type=['pdf', 'docx', 'pptx', 'txt'] | |
| ) | |
| # Text input section | |
| script = "" | |
| if uploaded_file: | |
| try: | |
| file_processor = FileProcessor() | |
| if uploaded_file.type == "text/plain": | |
| script = file_processor.read_txt(uploaded_file) | |
| elif uploaded_file.type == "application/pdf": | |
| script = file_processor.read_pdf(uploaded_file) | |
| elif uploaded_file.type == "application/vnd.openxmlformats-officedocument.wordprocessingml.document": | |
| script = file_processor.read_docx(uploaded_file) | |
| elif uploaded_file.type == "application/vnd.openxmlformats-officedocument.presentationml.presentation": | |
| script = file_processor.read_pptx(uploaded_file) | |
| except Exception as e: | |
| st.error(f"Error processing file: {str(e)}") | |
| script = st.text_area("Enter or edit your video script", value=script, height=200) | |
| if st.button("Generate Video") and script: | |
| try: | |
| # Initialize video generator | |
| generator = VideoGenerator() | |
| # Get stock images (replace with your image selection logic) | |
| images = [ | |
| "https://images.pexels.com/photos/60504/security-protection-anti-virus-software-60504.jpeg", | |
| "https://images.pexels.com/photos/5380642/pexels-photo-5380642.jpeg", | |
| "https://images.pexels.com/photos/2582937/pexels-photo-2582937.jpeg" | |
| ] | |
| # Generate video | |
| output_path = "output_video.mp4" | |
| with st.spinner("Generating video..."): | |
| video_path = generator.generate_video(script, images, 30, output_path) | |
| # Display video | |
| if os.path.exists(video_path): | |
| st.success("Video generated successfully!") | |
| with open(video_path, 'rb') as video_file: | |
| video_bytes = video_file.read() | |
| st.video(video_bytes) | |
| # Download button | |
| st.download_button( | |
| label="Download Video", | |
| data=video_bytes, | |
| file_name="vaultgenix_video.mp4", | |
| mime="video/mp4" | |
| ) | |
| except Exception as e: | |
| st.error(f"Error generating video: {str(e)}") | |
| print(f"Error details: {str(e)}") | |
| if __name__ == "__main__": | |
| create_ui() | |