Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| from pathlib import Path | |
| import torch | |
| from transformers import ( | |
| GPT2LMHeadModel, | |
| GPT2Tokenizer, | |
| pipeline, | |
| AutoModelForCausalLM, | |
| AutoTokenizer | |
| ) | |
| from moviepy.editor import ( | |
| VideoFileClip, | |
| TextClip, | |
| CompositeVideoClip, | |
| AudioFileClip, | |
| ColorClip, | |
| vfx, | |
| concatenate_videoclips | |
| ) | |
| import requests | |
| import json | |
| from typing import Dict, List, Optional, Union | |
| import tempfile | |
| import os | |
| from dotenv import load_dotenv | |
| import time | |
| from datetime import datetime | |
| import nltk | |
| from tqdm import tqdm | |
| import pyttsx3 | |
| from gtts import gTTS | |
| import numpy as np | |
| from PIL import Image, ImageDraw, ImageFont | |
| import io | |
| import random | |
| # Set timeout for model downloads | |
| import socket | |
| socket.setdefaulttimeout(30) # 30 second timeout | |
| # Configure transformers to use smaller models and cache | |
| from transformers import logging | |
| logging.set_verbosity_error() # Reduce logging noise | |
| os.environ['TOKENIZERS_PARALLELISM'] = 'true' | |
| # Download NLTK data at startup | |
| try: | |
| nltk.data.find('tokenizers/punkt') | |
| nltk.data.find('taggers/averaged_perceptron_tagger') | |
| except LookupError: | |
| nltk.download('punkt', quiet=True) | |
| nltk.download('averaged_perceptron_tagger', quiet=True) | |
| # Load environment variables | |
| load_dotenv() | |
| def create_progress_tracker(): | |
| """Create a progress tracking system""" | |
| progress = st.progress(0) | |
| status = st.empty() | |
| def update(percentage: int, message: str): | |
| progress.progress(percentage) | |
| status.text(message) | |
| return update | |
| # Initialize models at module level for caching | |
| def load_models(): | |
| """Load all AI models with better error handling and timeouts""" | |
| models = {} | |
| try: | |
| # GPT-2 (smaller version) | |
| models['gpt2_model'] = GPT2LMHeadModel.from_pretrained('gpt2', | |
| low_cpu_mem_usage=True, | |
| torch_dtype=torch.float32) | |
| models['gpt2_tokenizer'] = GPT2Tokenizer.from_pretrained('gpt2') | |
| # Use smaller BLOOM model | |
| models['bloom_model'] = AutoModelForCausalLM.from_pretrained("bigscience/bloom-560m", | |
| low_cpu_mem_usage=True, | |
| torch_dtype=torch.float32) | |
| models['bloom_tokenizer'] = AutoTokenizer.from_pretrained("bigscience/bloom-560m") | |
| # Use smaller sentiment model | |
| models['sentiment_pipeline'] = pipeline('sentiment-analysis', | |
| model='distilbert-base-uncased-finetuned-sst-2-english', | |
| device=-1) # Force CPU usage | |
| return models | |
| except Exception as e: | |
| st.error(f"Error loading models: {str(e)}") | |
| return None | |
| class ContentStyle: | |
| def __init__( | |
| self, | |
| font_size: int = 40, | |
| font_color: str = "#FFFFFF", | |
| background_effect: str = "None", | |
| transition_effect: str = "fade", | |
| text_animation: str = "slide", | |
| theme: str = "modern", | |
| layout: str = "centered" | |
| ): | |
| self.font_size = font_size | |
| self.font_color = font_color | |
| self.background_effect = background_effect | |
| self.transition_effect = transition_effect | |
| self.text_animation = text_animation | |
| self.theme = theme | |
| self.layout = layout | |
| class AIContentEngine: | |
| def __init__(self, update_progress): | |
| self.update_progress = update_progress | |
| # Use only GPT-2 for faster generation | |
| self.tokenizer = GPT2Tokenizer.from_pretrained('gpt2') | |
| self.model = GPT2LMHeadModel.from_pretrained('gpt2') | |
| # Move model to CPU explicitly | |
| self.model = self.model.to('cpu') | |
| # Set model to evaluation mode | |
| self.model.eval() | |
| def generate_with_timeout(self, prompt: str, max_length: int = 100, timeout: int = 30) -> str: | |
| """Generate content with timeout""" | |
| try: | |
| # Set up inputs | |
| inputs = self.tokenizer(prompt, return_tensors='pt', truncation=True, max_length=max_length) | |
| # Generate with basic parameters for speed | |
| with torch.no_grad(): # Disable gradient calculation | |
| outputs = self.model.generate( | |
| inputs['input_ids'], | |
| max_length=max_length, | |
| num_return_sequences=1, | |
| temperature=0.7, | |
| top_k=40, | |
| do_sample=True, | |
| pad_token_id=self.tokenizer.eos_token_id, | |
| attention_mask=inputs['attention_mask'] | |
| ) | |
| return self.tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| except Exception as e: | |
| return f"{prompt} [Error: {str(e)}]" | |
| def initialize_models(self): | |
| """Initialize models with retries""" | |
| max_retries = 3 | |
| retry_count = 0 | |
| while retry_count < max_retries: | |
| try: | |
| self.update_progress(5 + retry_count * 5, f"Loading AI models (attempt {retry_count + 1})...") | |
| self.models = load_models() | |
| if self.models is not None: | |
| return | |
| retry_count += 1 | |
| time.sleep(2) # Wait before retrying | |
| except Exception as e: | |
| retry_count += 1 | |
| if retry_count == max_retries: | |
| raise Exception(f"Failed to load models after {max_retries} attempts: {str(e)}") | |
| time.sleep(2) | |
| def generate_dynamic_prompt(self, preferences: Dict) -> str: | |
| """Generate context-aware prompt based on detailed user preferences""" | |
| prompt_template = f""" | |
| Create {preferences['style']} content about {preferences['topic']} | |
| with a {preferences['tone']} tone. | |
| Target audience: {preferences['audience']}. | |
| Purpose: {preferences['purpose']}. | |
| Key message: {preferences['message']}. | |
| Content format: {preferences.get('format', 'General')}. | |
| Include specific examples and actionable insights. | |
| Make it engaging and memorable. | |
| """ | |
| return prompt_template.strip() | |
| def analyze_content_quality(self, content: str) -> float: | |
| """Advanced content quality analysis with multiple metrics""" | |
| score = 0.0 | |
| # Sentiment analysis | |
| sentiment = self.sentiment_pipeline(content)[0] | |
| score += sentiment['score'] if sentiment['label'] == 'POSITIVE' else 0 | |
| # Length analysis | |
| words = content.split() | |
| word_count = len(words) | |
| if 20 <= word_count <= 50: | |
| score += 0.3 | |
| # Complexity and readability | |
| tokens = nltk.word_tokenize(content) | |
| pos_tags = nltk.pos_tag(tokens) | |
| # Vocabulary diversity | |
| unique_words = len(set(words)) / len(words) | |
| score += unique_words * 0.3 | |
| # Sentence structure variety | |
| sentences = nltk.sent_tokenize(content) | |
| avg_sentence_length = np.mean([len(nltk.word_tokenize(sent)) for sent in sentences]) | |
| if 10 <= avg_sentence_length <= 20: | |
| score += 0.2 | |
| return score | |
| def generate_content_package(self, preferences: Dict) -> Dict[str, str]: | |
| """Generate content package with faster processing""" | |
| content_package = {} | |
| try: | |
| # Simplified prompt templates for faster generation | |
| templates = { | |
| 'main_content': f"Write a short post about {preferences['topic']} that is {preferences['tone']}.", | |
| 'quote': f"A short quote about {preferences['topic']}:", | |
| 'tips': f"Three quick tips about {preferences['topic']}:", | |
| 'call_to_action': f"Call to action for {preferences['topic']}:", | |
| 'hashtags': f"Trending hashtags for {preferences['topic']}:" | |
| } | |
| # Generate content with progress updates | |
| for i, (key, prompt) in enumerate(templates.items()): | |
| progress = 30 + (i * 10) # Progress from 30% to 80% | |
| self.update_progress(progress, f"Generating {key.replace('_', ' ')}...") | |
| # Generate with timeout | |
| content = self.generate_with_timeout(prompt, | |
| max_length=100 if key != 'main_content' else 200) | |
| content_package[key] = content | |
| # Add small delay between generations | |
| time.sleep(0.1) | |
| return content_package | |
| except Exception as e: | |
| st.error(f"Error in content generation: {str(e)}") | |
| return content_package if content_package else None | |
| def generate_with_models(self, prompt: str, max_length: int = 100) -> str: | |
| """Generate content with fallback options""" | |
| try: | |
| # Try GPT-2 first | |
| if 'gpt2_model' in self.models: | |
| inputs = self.models['gpt2_tokenizer'](prompt, return_tensors='pt', truncation=True) | |
| outputs = self.models['gpt2_model'].generate( | |
| inputs['input_ids'], | |
| max_length=max_length, | |
| num_return_sequences=1, | |
| temperature=0.7, | |
| top_k=50, | |
| top_p=0.95, | |
| do_sample=True | |
| ) | |
| return self.models['gpt2_tokenizer'].decode(outputs[0], skip_special_tokens=True) | |
| # Fallback to BLOOM if GPT-2 fails | |
| elif 'bloom_model' in self.models: | |
| inputs = self.models['bloom_tokenizer'](prompt, return_tensors="pt") | |
| outputs = self.models['bloom_model'].generate( | |
| inputs['input_ids'], | |
| max_length=max_length, | |
| num_return_sequences=1, | |
| temperature=0.7, | |
| do_sample=True | |
| ) | |
| return self.models['bloom_tokenizer'].decode(outputs[0], skip_special_tokens=True) | |
| else: | |
| raise Exception("No language models available") | |
| except Exception as e: | |
| st.warning(f"Error in content generation: {str(e)}") | |
| # Return a basic response if all else fails | |
| return prompt + " [Content generation failed, please try again]" | |
| class VideoGenerator: | |
| def __init__(self, update_progress): | |
| self.update_progress = update_progress | |
| self.temp_dir = tempfile.mkdtemp() | |
| self.pixabay_api_key = os.getenv("PIXABAY_API_KEY") | |
| def create_text_image(self, text: str, size=(1080, 1920)) -> np.ndarray: | |
| """Create text overlay using PIL with updated text measurement methods""" | |
| # Create image with PIL | |
| image = Image.new('RGBA', size, (0, 0, 128, 255)) # Navy blue background | |
| draw = ImageDraw.Draw(image) | |
| # Use a basic font | |
| try: | |
| font = ImageFont.truetype("/usr/share/fonts/dejavu/DejaVuSans-Bold.ttf", 40) | |
| except: | |
| # Fallback to default font if custom font not available | |
| font = ImageFont.load_default() | |
| # Word wrap text | |
| words = text.split() | |
| lines = [] | |
| current_line = [] | |
| for word in words: | |
| current_line.append(word) | |
| # Use textlength instead of textsize | |
| w = draw.textlength(' '.join(current_line), font=font) | |
| if w > size[0] - 100: # Leave margins | |
| if len(current_line) > 1: | |
| current_line.pop() | |
| lines.append(' '.join(current_line)) | |
| current_line = [word] | |
| else: | |
| lines.append(' '.join(current_line)) | |
| current_line = [] | |
| if current_line: | |
| lines.append(' '.join(current_line)) | |
| # Calculate text position | |
| y_position = size[1] // 2 - (len(lines) * 50) // 2 # Center text vertically | |
| # Draw each line | |
| for line in lines: | |
| # Use textbbox to get the text dimensions | |
| bbox = draw.textbbox((0, 0), line, font=font) | |
| text_width = bbox[2] - bbox[0] | |
| x_position = (size[0] - text_width) // 2 # Center text horizontally | |
| draw.text((x_position, y_position), line, fill='white', font=font) | |
| y_position += 50 | |
| return np.array(image) | |
| def fetch_background_video(self, query: str) -> Optional[str]: | |
| """Fetch background video from Pixabay with error handling""" | |
| try: | |
| url = f"https://pixabay.com/api/videos/?key={self.pixabay_api_key}&q={query}" | |
| response = requests.get(url) | |
| data = response.json() | |
| if data.get("hits"): | |
| video = random.choice(data["hits"]) | |
| video_url = video["videos"]["medium"]["url"] | |
| video_path = os.path.join(self.temp_dir, "background.mp4") | |
| response = requests.get(video_url) | |
| with open(video_path, "wb") as f: | |
| f.write(response.content) | |
| return video_path | |
| return None | |
| except Exception as e: | |
| st.error(f"Error fetching video: {str(e)}") | |
| return None | |
| def generate_voiceover(self, text: str, voice_type: str = "gtts") -> Optional[str]: | |
| """Generate voiceover with multiple options and error handling""" | |
| try: | |
| audio_path = os.path.join(self.temp_dir, "voiceover.mp3") | |
| if voice_type == "gtts": | |
| tts = gTTS(text=text, lang='en', tld='com') | |
| tts.save(audio_path) | |
| else: # pyttsx3 | |
| engine = pyttsx3.init() | |
| engine.save_to_file(text, audio_path) | |
| engine.runAndWait() | |
| return audio_path | |
| except Exception as e: | |
| st.error(f"Error generating voiceover: {str(e)}") | |
| return None | |
| def create_video(self, content: Dict[str, str], preferences: Dict, style: ContentStyle) -> Optional[str]: | |
| """Create video with PIL-based text rendering and proper MoviePy conversion""" | |
| try: | |
| self.update_progress(85, "Creating video...") | |
| # Create frames directory | |
| frames_dir = os.path.join(self.temp_dir, "frames") | |
| os.makedirs(frames_dir, exist_ok=True) | |
| # Create main content frame | |
| frame = self.create_text_image(content['main_content'][:200]) | |
| # Save frame as image | |
| frame_path = os.path.join(frames_dir, "frame.png") | |
| Image.fromarray(frame).save(frame_path) | |
| # Create video from frame | |
| output_path = os.path.join(self.temp_dir, f"output_{int(time.time())}.mp4") | |
| # Create video clip from image (not video file) | |
| # Use ImageClip instead of VideoFileClip for static images | |
| from moviepy.editor import ImageClip | |
| clip = ImageClip(frame_path).set_duration(15) # 15 seconds duration | |
| # Write video file | |
| self.update_progress(95, "Saving video...") | |
| clip.write_videofile(output_path, fps=24, codec='libx264', audio=False) | |
| # Cleanup | |
| clip.close() | |
| if os.path.exists(frame_path): | |
| os.remove(frame_path) | |
| return output_path | |
| except Exception as e: | |
| st.error(f"Error creating video: {str(e)}") | |
| return None | |
| finally: | |
| # Cleanup frames directory | |
| if os.path.exists(frames_dir): | |
| import shutil | |
| shutil.rmtree(frames_dir) | |
| def main(): | |
| st.set_page_config(page_title="Professional Content Generator", layout="wide") | |
| st.title("🎬 AI Content Generator") | |
| # Advanced User Preferences | |
| with st.form("content_preferences"): | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| topic = st.text_input("Main Topic", help="What's your content about?") | |
| style = st.selectbox( | |
| "Content Style", | |
| ["Motivational", "Educational", "Business", "Personal Development", | |
| "Technical", "Creative", "Storytelling", "News", "Tutorial"] | |
| ) | |
| tone = st.selectbox( | |
| "Tone", | |
| ["Inspiring", "Professional", "Casual", "Intense", "Friendly", | |
| "Authoritative", "Empathetic", "Humorous", "Serious"] | |
| ) | |
| with col2: | |
| audience = st.selectbox( | |
| "Target Audience", | |
| ["Entrepreneurs", "Students", "Professionals", "General Public", | |
| "Technical", "Creative", "Business Leaders", "Educators"] | |
| ) | |
| purpose = st.selectbox( | |
| "Content Purpose", | |
| ["Inspire", "Educate", "Motivate", "Transform", "Inform", | |
| "Entertain", "Persuade", "Guide", "Analyze"] | |
| ) | |
| message = st.text_input("Key Message", help="Core message to convey") | |
| with col3: | |
| mood = st.selectbox( | |
| "Visual Mood", | |
| ["Energetic", "Calm", "Professional", "Creative", "Modern", | |
| "Traditional", "Minimalist", "Bold", "Subtle"] | |
| ) | |
| voice_type = st.selectbox( | |
| "Voice Type", | |
| ["Natural", "Professional", "Friendly", "Authoritative", | |
| "Casual", "Energetic", "Calm", "Dynamic"] | |
| ) | |
| # Advanced options in expandable section | |
| with st.expander("Advanced Options"): | |
| col4, col5 = st.columns(2) | |
| with col4: | |
| font_size = st.slider("Font Size", 30, 70, 40) | |
| font_color = st.color_picker("Font Color", "#FFFFFF") | |
| text_animation = st.selectbox( | |
| "Text Animation", | |
| ["Fade", "Slide", "Static", "Bounce", "Zoom"] | |
| ) | |
| with col5: | |
| background_effect = st.selectbox( | |
| "Background Effect", | |
| ["None", "Zoom", "Blur", "Bright", "Contrast"] | |
| ) | |
| transition_effect = st.selectbox( | |
| "Transition Effect", | |
| ["None", "Fade", "Slide", "Dissolve", "Wipe"] | |
| ) | |
| theme = st.selectbox( | |
| "Visual Theme", | |
| ["Modern", "Classic", "Minimal", "Bold", "Corporate", "Creative"] | |
| ) | |
| layout = st.selectbox( | |
| "Content Layout", | |
| ["Centered", "Split", "Grid", "Dynamic", "Minimal"] | |
| ) | |
| submit_button = st.form_submit_button("Generate Content") | |
| if submit_button: | |
| if not topic or not message: | |
| st.error("Please fill in at least the topic and key message fields.") | |
| return | |
| # Initialize style configuration | |
| style_config = ContentStyle( | |
| font_size=font_size, | |
| font_color=font_color, | |
| background_effect=background_effect, | |
| transition_effect=transition_effect, | |
| text_animation=text_animation, | |
| theme=theme, | |
| layout=layout | |
| ) | |
| # Collect preferences before trying to use them | |
| preferences = { | |
| 'topic': topic, | |
| 'style': style, | |
| 'tone': tone, | |
| 'audience': audience, | |
| 'purpose': purpose, | |
| 'message': message, | |
| 'mood': mood, | |
| 'voice_type': voice_type | |
| } | |
| try: | |
| # Create progress tracker | |
| update_progress = create_progress_tracker() | |
| # Initialize engines with progress tracker | |
| update_progress(5, "Initializing AI engine...") | |
| ai_engine = AIContentEngine(update_progress) | |
| update_progress(15, "Initializing video generator...") | |
| video_gen = VideoGenerator(update_progress) | |
| # Generate content | |
| update_progress(25, "Starting content generation...") | |
| content_package = ai_engine.generate_content_package(preferences) | |
| # Create video | |
| update_progress(65, "Starting video creation...") | |
| video_path = video_gen.create_video(content_package, preferences, style_config) | |
| # Display results | |
| update_progress(100, "Complete!") | |
| if video_path and os.path.exists(video_path): | |
| try: | |
| video_file = open(video_path, 'rb') | |
| video_bytes = video_file.read() | |
| st.video(video_bytes) | |
| video_file.close() | |
| # Download button | |
| st.download_button( | |
| label="Download Video", | |
| data=video_bytes, | |
| file_name=f"content_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4", | |
| mime="video/mp4" | |
| ) | |
| finally: | |
| if 'video_file' in locals(): | |
| video_file.close() | |
| if os.path.exists(video_path): | |
| os.remove(video_path) | |
| except Exception as e: | |
| st.error(f"An error occurred: {str(e)}") | |
| if __name__ == "__main__": | |
| main() |