Spaces:
Running
Running
| import gradio as gr | |
| from transformers import pipeline | |
| from diffusers import StableDiffusionPipeline | |
| import torch | |
| from PIL import Image | |
| import numpy as np | |
| import os | |
| import tempfile | |
| import moviepy.editor as mpe | |
| import nltk | |
| from pydub import AudioSegment | |
| import warnings | |
| import asyncio | |
| import edge_tts | |
| warnings.filterwarnings("ignore", category=UserWarning) | |
| # Ensure NLTK data is downloaded | |
| nltk.download('punkt') | |
| # Initialize models | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| torch_dtype = torch.float16 if device == "cuda" else torch.float32 | |
| # Story generator | |
| story_generator = pipeline( | |
| 'text-generation', | |
| model='gpt2-large', | |
| device=0 if device == 'cuda' else -1 | |
| ) | |
| # Stable Diffusion model | |
| sd_model_id = "runwayml/stable-diffusion-v1-5" | |
| sd_pipe = StableDiffusionPipeline.from_pretrained( | |
| sd_model_id, | |
| torch_dtype=torch_dtype | |
| ) | |
| sd_pipe = sd_pipe.to(device) | |
| # Text-to-Speech function using edge_tts | |
| def text2speech(text): | |
| try: | |
| output_path = asyncio.run(_text2speech_async(text)) | |
| return output_path | |
| except Exception as e: | |
| print(f"Error in text2speech: {str(e)}") | |
| raise | |
| async def _text2speech_async(text): | |
| communicate = edge_tts.Communicate(text, voice="en-US-AriaNeural") | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file: | |
| tmp_path = tmp_file.name | |
| await communicate.save(tmp_path) | |
| return tmp_path | |
| def generate_story(prompt): | |
| generated = story_generator(prompt, max_length=500, num_return_sequences=1) | |
| story = generated[0]['generated_text'] | |
| return story | |
| def split_story_into_sentences(story): | |
| sentences = nltk.sent_tokenize(story) | |
| return sentences | |
| def generate_images(sentences): | |
| images = [] | |
| for idx, sentence in enumerate(sentences): | |
| image = sd_pipe(sentence).images[0] | |
| # Save image to temporary file | |
| temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=f"_{idx}.png") | |
| image.save(temp_file.name) | |
| images.append(temp_file.name) | |
| return images | |
| def generate_audio(story_text): | |
| audio_path = text2speech(story_text) | |
| audio = AudioSegment.from_file(audio_path) | |
| total_duration = len(audio) / 1000 # duration in seconds | |
| return audio_path, total_duration | |
| def compute_sentence_durations(sentences, total_duration): | |
| total_words = sum(len(sentence.split()) for sentence in sentences) | |
| sentence_durations = [] | |
| for sentence in sentences: | |
| num_words = len(sentence.split()) | |
| duration = total_duration * (num_words / total_words) | |
| sentence_durations.append(duration) | |
| return sentence_durations | |
| def create_video(images, durations, audio_path): | |
| clips = [] | |
| for image_path, duration in zip(images, durations): | |
| clip = mpe.ImageClip(image_path).set_duration(duration) | |
| clips.append(clip) | |
| video = mpe.concatenate_videoclips(clips, method='compose') | |
| audio = mpe.AudioFileClip(audio_path) | |
| video = video.set_audio(audio) | |
| # Save video | |
| output_path = os.path.join(tempfile.gettempdir(), "final_video.mp4") | |
| video.write_videofile(output_path, fps=1, codec='libx264') | |
| return output_path | |
| def process_pipeline(prompt, progress=gr.Progress(track_tqdm=True)): | |
| try: | |
| with progress.tqdm(total=6) as pbar: | |
| pbar.set_description("Generating Story") | |
| story = generate_story(prompt) | |
| pbar.update(1) | |
| pbar.set_description("Splitting Story into Sentences") | |
| sentences = split_story_into_sentences(story) | |
| pbar.update(1) | |
| pbar.set_description("Generating Images for Sentences") | |
| images = generate_images(sentences) | |
| pbar.update(1) | |
| pbar.set_description("Generating Audio") | |
| audio_path, total_duration = generate_audio(story) | |
| pbar.update(1) | |
| pbar.set_description("Computing Durations") | |
| durations = compute_sentence_durations(sentences, total_duration) | |
| pbar.update(1) | |
| pbar.set_description("Creating Video") | |
| video_path = create_video(images, durations, audio_path) | |
| pbar.update(1) | |
| return video_path | |
| except Exception as e: | |
| print(f"Error in process_pipeline: {str(e)}") | |
| raise gr.Error(f"An error occurred: {str(e)}") | |
| title = """<h1 align="center">AI Story Video Generator ๐ฅ</h1> | |
| <p align="center"> | |
| Generate a story from a prompt, create images for each sentence, and produce a video with narration! | |
| </p> | |
| """ | |
| with gr.Blocks(css=".container { max-width: 800px; margin: auto; }") as demo: | |
| gr.HTML(title) | |
| with gr.Row(): | |
| with gr.Column(): | |
| prompt_input = gr.Textbox(label="Enter a Prompt", lines=2) | |
| generate_button = gr.Button("Generate Video") | |
| with gr.Column(): | |
| video_output = gr.Video(label="Generated Video") | |
| generate_button.click(fn=process_pipeline, inputs=prompt_input, outputs=video_output) | |
| demo.launch(debug=True) | |