| | import os |
| | import re |
| | import json |
| | import time |
| | import random |
| | import tempfile |
| | import requests |
| | import numpy as np |
| | import uuid |
| | from PIL import Image, ImageDraw, ImageFont |
| | from io import BytesIO |
| | from datetime import datetime |
| | import gradio as gr |
| | from dotenv import load_dotenv |
| | import moviepy.editor as mpy |
| | from moviepy.editor import * |
| | from moviepy.audio.fx.all import volumex |
| | from moviepy.video.fx.all import crop |
| |
|
| | |
| | import sys |
| | if sys.platform.startswith('win'): |
| | import asyncio |
| | asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) |
| |
|
| | |
| | load_dotenv() |
| |
|
| | |
| | BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
| | STATIC_DIR = os.path.join(BASE_DIR, "static") |
| | MUSIC_DIR = os.path.join(STATIC_DIR, "music") |
| | FONTS_DIR = os.path.join(STATIC_DIR, "fonts") |
| | STORAGE_DIR = os.path.join(BASE_DIR, "storage") |
| |
|
| | |
| | os.makedirs(STATIC_DIR, exist_ok=True) |
| | os.makedirs(MUSIC_DIR, exist_ok=True) |
| | os.makedirs(FONTS_DIR, exist_ok=True) |
| | os.makedirs(STORAGE_DIR, exist_ok=True) |
| |
|
| | |
| | def info(message): |
| | timestamp = datetime.now().strftime("%H:%M:%S") |
| | formatted_message = f"[{timestamp}] [INFO] {message}" |
| | print(formatted_message) |
| | return formatted_message |
| |
|
| | def success(message): |
| | timestamp = datetime.now().strftime("%H:%M:%S") |
| | formatted_message = f"[{timestamp}] [SUCCESS] {message}" |
| | print(formatted_message) |
| | return formatted_message |
| |
|
| | def warning(message): |
| | timestamp = datetime.now().strftime("%H:%M:%S") |
| | formatted_message = f"[{timestamp}] [WARNING] {message}" |
| | print(formatted_message) |
| | return formatted_message |
| |
|
| | def error(message): |
| | timestamp = datetime.now().strftime("%H:%M:%S") |
| | formatted_message = f"[{timestamp}] [ERROR] {message}" |
| | print(formatted_message) |
| | return formatted_message |
| |
|
| | def get_music_files(): |
| | """Get list of available music files in the music directory.""" |
| | if not os.path.exists(MUSIC_DIR): |
| | return ["none"] |
| | |
| | music_files = [f for f in os.listdir(MUSIC_DIR) if f.endswith(('.mp3', '.wav'))] |
| | if not music_files: |
| | return ["none"] |
| | |
| | return ["random"] + music_files |
| |
|
| | def get_font_files(): |
| | """Get list of available font files in the fonts directory.""" |
| | if not os.path.exists(FONTS_DIR): |
| | return ["default"] |
| | |
| | font_files = [f.split('.')[0] for f in os.listdir(FONTS_DIR) if f.endswith(('.ttf', '.otf'))] |
| | if not font_files: |
| | return ["default"] |
| | |
| | return ["random"] + font_files |
| |
|
| | def choose_random_music(): |
| | """Selects a random music file from the music directory.""" |
| | if not os.path.exists(MUSIC_DIR): |
| | error(f"Music directory {MUSIC_DIR} does not exist") |
| | return None |
| | |
| | music_files = [f for f in os.listdir(MUSIC_DIR) if f.endswith(('.mp3', '.wav'))] |
| | if not music_files: |
| | warning(f"No music files found in {MUSIC_DIR}") |
| | return None |
| | |
| | return os.path.join(MUSIC_DIR, random.choice(music_files)) |
| |
|
| | def choose_random_font(): |
| | """Selects a random font file from the fonts directory.""" |
| | if not os.path.exists(FONTS_DIR): |
| | error(f"Fonts directory {FONTS_DIR} does not exist") |
| | return "default" |
| | |
| | font_files = [f for f in os.listdir(FONTS_DIR) if f.endswith(('.ttf', '.otf'))] |
| | if not font_files: |
| | warning(f"No font files found in {FONTS_DIR}") |
| | return None |
| | |
| | return font_files[0].split('.')[0] if len(font_files) == 1 else random.choice([f.split('.')[0] for f in font_files]) |
| |
|
| | class YouTube: |
| | def __init__(self, niche: str, language: str, |
| | text_gen="g4f", text_model="gpt-4", |
| | image_gen="g4f", image_model="flux", |
| | tts_engine="edge", tts_voice="en-US-AriaNeural", |
| | subtitle_font="default", font_size=80, |
| | text_color="white", highlight_color="blue", |
| | subtitles_enabled=True, highlighting_enabled=True, |
| | subtitle_position="bottom", music_file="random", |
| | enable_music=True, music_volume=0.1, |
| | api_keys=None, progress=gr.Progress()) -> None: |
| | |
| | """Initialize the YouTube Shorts Generator.""" |
| | self.progress = progress |
| | self.progress(0, desc="Initializing") |
| | |
| | |
| | info(f"Initializing YouTube class") |
| | self._niche = niche |
| | self._language = language |
| | self.text_gen = text_gen |
| | self.text_model = text_model |
| | self.image_gen = image_gen |
| | self.image_model = image_model |
| | self.tts_engine = tts_engine |
| | self.tts_voice = tts_voice |
| | self.subtitle_font = subtitle_font |
| | self.font_size = font_size |
| | self.text_color = text_color |
| | self.highlight_color = highlight_color |
| | self.subtitles_enabled = subtitles_enabled |
| | self.highlighting_enabled = highlighting_enabled |
| | self.subtitle_position = subtitle_position |
| | self.music_file = music_file |
| | self.enable_music = enable_music |
| | self.music_volume = music_volume |
| | self.api_keys = api_keys or {} |
| | self.images = [] |
| | self.logs = [] |
| | |
| | |
| | if 'gemini' in self.api_keys and self.api_keys['gemini']: |
| | os.environ["GEMINI_API_KEY"] = self.api_keys['gemini'] |
| | |
| | if 'assemblyai' in self.api_keys and self.api_keys['assemblyai']: |
| | os.environ["ASSEMBLYAI_API_KEY"] = self.api_keys['assemblyai'] |
| | |
| | if 'elevenlabs' in self.api_keys and self.api_keys['elevenlabs']: |
| | os.environ["ELEVENLABS_API_KEY"] = self.api_keys['elevenlabs'] |
| | |
| | if 'segmind' in self.api_keys and self.api_keys['segmind']: |
| | os.environ["SEGMIND_API_KEY"] = self.api_keys['segmind'] |
| | |
| | if 'openai' in self.api_keys and self.api_keys['openai']: |
| | os.environ["OPENAI_API_KEY"] = self.api_keys['openai'] |
| | |
| | info(f"Niche: {niche}, Language: {language}") |
| | self.log(f"Initialized with niche: {niche}, language: {language}") |
| | self.log(f"Text generator: {text_gen} - Model: {text_model}") |
| | self.log(f"Image generator: {image_gen} - Model: {image_model}") |
| | self.log(f"TTS engine: {tts_engine} - Voice: {tts_voice}") |
| | self.log(f"Subtitles: {'Enabled' if subtitles_enabled else 'Disabled'} - Highlighting: {'Enabled' if highlighting_enabled else 'Disabled'}") |
| | self.log(f"Music: {music_file}") |
| | |
| | def log(self, message): |
| | """Add a log message to the logs list.""" |
| | timestamp = datetime.now().strftime("%H:%M:%S") |
| | log_entry = f"[{timestamp}] {message}" |
| | self.logs.append(log_entry) |
| | return log_entry |
| | |
| | @property |
| | def niche(self) -> str: |
| | return self._niche |
| | |
| | @property |
| | def language(self) -> str: |
| | return self._language |
| | |
| | def generate_response(self, prompt: str, model: str = None) -> str: |
| | """Generate a response using the selected text generation model.""" |
| | self.log(f"Generating response for prompt: {prompt[:50]}...") |
| | |
| | try: |
| | if self.text_gen == "gemini": |
| | self.log("Using Google's Gemini model") |
| | |
| | |
| | gemini_api_key = os.environ.get("GEMINI_API_KEY", "") |
| | if not gemini_api_key: |
| | raise ValueError("Gemini API key is not set. Please provide a valid API key.") |
| | |
| | import google.generativeai as genai |
| | genai.configure(api_key=gemini_api_key) |
| | model_to_use = model if model else self.text_model |
| | genai_model = genai.GenerativeModel(model_to_use) |
| | response = genai_model.generate_content(prompt).text |
| | |
| | elif self.text_gen == "g4f": |
| | self.log("Using G4F for text generation") |
| | import g4f |
| | model_to_use = model if model else self.text_model |
| | self.log(f"Using G4F model: {model_to_use}") |
| | response = g4f.ChatCompletion.create( |
| | model=model_to_use, |
| | messages=[{"role": "user", "content": prompt}] |
| | ) |
| | |
| | elif self.text_gen == "openai": |
| | self.log("Using OpenAI for text generation") |
| | openai_api_key = os.environ.get("OPENAI_API_KEY", "") |
| | if not openai_api_key: |
| | raise ValueError("OpenAI API key is not set. Please provide a valid API key.") |
| | |
| | from openai import OpenAI |
| | client = OpenAI(api_key=openai_api_key) |
| | model_to_use = model if model else "gpt-3.5-turbo" |
| | |
| | response = client.chat.completions.create( |
| | model=model_to_use, |
| | messages=[{"role": "user", "content": prompt}] |
| | ).choices[0].message.content |
| | |
| | else: |
| | |
| | error_msg = f"Unsupported text generator: {self.text_gen}" |
| | self.log(error(error_msg)) |
| | raise ValueError(error_msg) |
| | |
| | self.log(f"Response generated successfully, length: {len(response)} characters") |
| | return response |
| | |
| | except Exception as e: |
| | error_msg = f"Error generating response: {str(e)}" |
| | self.log(error(error_msg)) |
| | raise Exception(error_msg) |
| |
|
| | def generate_topic(self) -> str: |
| | """Generate a topic based on the YouTube Channel niche.""" |
| | self.progress(0.05, desc="Generating topic") |
| | self.log("Generating topic based on niche") |
| | |
| | completion = self.generate_response( |
| | f"Please generate a specific video idea that takes about the following topic: {self.niche}. " |
| | f"Make it exactly one sentence. Only return the topic, nothing else." |
| | ) |
| |
|
| | if not completion: |
| | self.log(error("Failed to generate Topic.")) |
| | raise Exception("Failed to generate a topic. Please try again with a different niche.") |
| |
|
| | self.subject = completion |
| | self.log(success(f"Generated topic: {completion}")) |
| | return completion |
| |
|
| | def generate_script(self) -> str: |
| | """Generate a script for a video, based on the subject and language.""" |
| | self.progress(0.1, desc="Creating script") |
| | self.log("Generating script for video") |
| | |
| | prompt = f""" |
| | Generate a script for youtube shorts video, depending on the subject of the video. |
| | |
| | The script is to be returned as a string with the specified number of paragraphs. |
| | |
| | Here is an example of a string: |
| | "This is an example string." |
| | |
| | Do not under any circumstance reference this prompt in your response. |
| | |
| | Get straight to the point, don't start with unnecessary things like, "welcome to this video". |
| | |
| | Obviously, the script should be related to the subject of the video. |
| | |
| | YOU MUST NOT INCLUDE ANY TYPE OF MARKDOWN OR FORMATTING IN THE SCRIPT, NEVER USE A TITLE. |
| | YOU MUST WRITE THE SCRIPT IN THE LANGUAGE SPECIFIED IN [LANGUAGE]. |
| | ONLY RETURN THE RAW CONTENT OF THE SCRIPT. DO NOT INCLUDE "VOICEOVER", "NARRATOR" OR SIMILAR INDICATORS. |
| | |
| | Subject: {self.subject} |
| | Language: {self.language} |
| | """ |
| | completion = self.generate_response(prompt) |
| |
|
| | |
| | completion = re.sub(r"\*", "", completion) |
| | |
| | if not completion: |
| | self.log(error("The generated script is empty.")) |
| | raise Exception("Failed to generate a script. Please try again.") |
| | |
| | if len(completion) > 5000: |
| | self.log(warning("Generated script is too long.")) |
| | raise ValueError("Generated script exceeds 5000 characters. Please try again.") |
| | |
| | self.script = completion |
| | self.log(success(f"Generated script ({len(completion)} chars)")) |
| | return completion |
| |
|
| | def generate_metadata(self) -> dict: |
| | """Generate video metadata (title, description).""" |
| | self.progress(0.15, desc="Creating title and description") |
| | self.log("Generating metadata (title and description)") |
| | |
| | title = self.generate_response( |
| | f"Please generate a YouTube Video Title for the following subject, including hashtags: " |
| | f"{self.subject}. Only return the title, nothing else. Limit the title under 100 characters." |
| | ) |
| |
|
| | if len(title) > 100: |
| | self.log(warning("Generated title exceeds 100 characters.")) |
| | raise ValueError("Generated title exceeds 100 characters. Please try again.") |
| |
|
| | description = self.generate_response( |
| | f"Please generate a YouTube Video Description for the following script: {self.script}. " |
| | f"Only return the description, nothing else." |
| | ) |
| | |
| | self.metadata = { |
| | "title": title, |
| | "description": description |
| | } |
| | |
| | self.log(success(f"Generated title: {title}")) |
| | self.log(success(f"Generated description: {description[:50]}...")) |
| | return self.metadata |
| | |
| | def generate_prompts(self, count=5) -> list: |
| | """Generate AI Image Prompts based on the provided Video Script.""" |
| | self.progress(0.2, desc="Creating image prompts") |
| | self.log(f"Generating {count} image prompts") |
| | |
| | prompt = f""" |
| | Generate {count} Image Prompts for AI Image Generation, |
| | depending on the subject of a video. |
| | Subject: {self.subject} |
| | |
| | The image prompts are to be returned as |
| | a JSON-Array of strings. |
| | |
| | Each search term should consist of a full sentence, |
| | always add the main subject of the video. |
| | |
| | Be emotional and use interesting adjectives to make the |
| | Image Prompt as detailed as possible. |
| | |
| | YOU MUST ONLY RETURN THE JSON-ARRAY OF STRINGS. |
| | YOU MUST NOT RETURN ANYTHING ELSE. |
| | YOU MUST NOT RETURN THE SCRIPT. |
| | |
| | The search terms must be related to the subject of the video. |
| | Here is an example of a JSON-Array of strings: |
| | ["image prompt 1", "image prompt 2", "image prompt 3"] |
| | |
| | For context, here is the full text: |
| | {self.script} |
| | """ |
| |
|
| | completion = str(self.generate_response(prompt))\ |
| | .replace("```json", "") \ |
| | .replace("```", "") |
| |
|
| | image_prompts = [] |
| |
|
| | if "image_prompts" in completion: |
| | try: |
| | image_prompts = json.loads(completion)["image_prompts"] |
| | except: |
| | self.log(warning("Failed to parse 'image_prompts' from JSON response.")) |
| | |
| | if not image_prompts: |
| | try: |
| | image_prompts = json.loads(completion) |
| | self.log(f"Parsed image prompts from JSON response.") |
| | except Exception: |
| | self.log(warning("JSON parsing failed. Attempting to extract array using regex...")) |
| |
|
| | |
| | r = re.compile(r"\[.*\]", re.DOTALL) |
| | matches = r.findall(completion) |
| | if len(matches) == 0: |
| | self.log(warning("Failed to extract array. Unable to create image prompts.")) |
| | raise ValueError("Failed to generate valid image prompts. Please try again.") |
| | else: |
| | try: |
| | image_prompts = json.loads(matches[0]) |
| | except: |
| | self.log(error("Failed to parse array from regex match.")) |
| | |
| | string_pattern = r'"([^"]*)"' |
| | strings = re.findall(string_pattern, matches[0]) |
| | if strings: |
| | image_prompts = strings |
| | else: |
| | self.log(error("Failed to extract strings from regex match.")) |
| | raise ValueError("Failed to parse image prompts. Please try again.") |
| |
|
| | |
| | if len(image_prompts) < count: |
| | self.log(warning(f"Received fewer prompts ({len(image_prompts)}) than requested ({count}).")) |
| | raise ValueError(f"Received only {len(image_prompts)} prompts instead of {count}. Please try again.") |
| | |
| | |
| | image_prompts = image_prompts[:count] |
| | |
| | self.image_prompts = image_prompts |
| | self.log(success(f"Generated {len(self.image_prompts)} Image Prompts")) |
| | for i, prompt in enumerate(self.image_prompts): |
| | self.log(f"Image Prompt {i+1}: {prompt}") |
| | |
| | return image_prompts |
| |
|
| | def generate_image(self, prompt) -> str: |
| | """Generate an image using the selected image generation model.""" |
| | self.log(f"Generating image for prompt: {prompt[:50]}...") |
| | |
| | |
| | if hasattr(self, 'generation_folder') and os.path.exists(self.generation_folder): |
| | image_path = os.path.join(self.generation_folder, f"img_{uuid.uuid4()}_{int(time.time())}.png") |
| | else: |
| | |
| | image_path = os.path.join(STORAGE_DIR, f"img_{uuid.uuid4()}_{int(time.time())}.png") |
| | |
| | if self.image_gen == "prodia": |
| | self.log("Using Prodia provider for image generation") |
| | s = requests.Session() |
| | headers = { |
| | "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" |
| | } |
| | |
| | |
| | self.log("Sending generation request to Prodia API") |
| | resp = s.get( |
| | "https://api.prodia.com/generate", |
| | params={ |
| | "new": "true", |
| | "prompt": prompt, |
| | "model": self.image_model, |
| | "negative_prompt": "verybadimagenegative_v1.3", |
| | "steps": "20", |
| | "cfg": "7", |
| | "seed": random.randint(1, 10000), |
| | "sample": "DPM++ 2M Karras", |
| | "aspect_ratio": "square" |
| | }, |
| | headers=headers |
| | ) |
| | |
| | if resp.status_code != 200: |
| | raise Exception(f"Prodia API error: {resp.text}") |
| | |
| | job_id = resp.json()['job'] |
| | self.log(f"Job created with ID: {job_id}") |
| | |
| | |
| | max_attempts = 30 |
| | attempts = 0 |
| | while attempts < max_attempts: |
| | attempts += 1 |
| | time.sleep(2) |
| | status = s.get(f"https://api.prodia.com/job/{job_id}", headers=headers).json() |
| | |
| | if status["status"] == "succeeded": |
| | self.log("Image generation successful, downloading result") |
| | img_data = s.get(f"https://images.prodia.xyz/{job_id}.png?download=1", headers=headers).content |
| | with open(image_path, "wb") as f: |
| | f.write(img_data) |
| | self.images.append(image_path) |
| | self.log(success(f"Image saved to: {image_path}")) |
| | return image_path |
| | |
| | elif status["status"] == "failed": |
| | raise Exception(f"Prodia job failed: {status.get('error', 'Unknown error')}") |
| | |
| | |
| | self.log(f"Still processing, attempt {attempts}/{max_attempts}...") |
| | |
| | raise Exception("Prodia job timed out") |
| | |
| | elif self.image_gen == "hercai": |
| | self.log("Using Hercai provider for image generation") |
| | url = f"https://hercai.onrender.com/{self.image_model}/text2image?prompt={prompt}" |
| | r = requests.get(url) |
| | |
| | if r.status_code != 200: |
| | raise Exception(f"Hercai API error: {r.text}") |
| | |
| | parsed = r.json() |
| | if "url" in parsed and parsed["url"]: |
| | self.log("Image URL received from Hercai") |
| | image_url = parsed["url"] |
| | img_data = requests.get(image_url).content |
| | with open(image_path, "wb") as f: |
| | f.write(img_data) |
| | self.images.append(image_path) |
| | self.log(success(f"Image saved to: {image_path}")) |
| | return image_path |
| | else: |
| | raise Exception("No image URL in Hercai response") |
| | |
| | elif self.image_gen == "g4f": |
| | self.log("Using G4F provider for image generation") |
| | from g4f.client import Client |
| | client = Client() |
| | response = client.images.generate( |
| | model=self.image_model, |
| | prompt=prompt, |
| | response_format="url" |
| | ) |
| | |
| | if response and response.data and len(response.data) > 0: |
| | image_url = response.data[0].url |
| | image_response = requests.get(image_url) |
| | |
| | if image_response.status_code == 200: |
| | with open(image_path, "wb") as f: |
| | f.write(image_response.content) |
| | self.images.append(image_path) |
| | self.log(success(f"Image saved to: {image_path}")) |
| | return image_path |
| | else: |
| | raise Exception(f"Failed to download image from {image_url}") |
| | else: |
| | raise Exception("No image URL received from G4F") |
| | |
| | elif self.image_gen == "segmind": |
| | self.log("Using Segmind provider for image generation") |
| | api_key = os.environ.get("SEGMIND_API_KEY", "") |
| | if not api_key: |
| | raise ValueError("Segmind API key is not set. Please provide a valid API key.") |
| | |
| | headers = { |
| | "x-api-key": api_key, |
| | "Content-Type": "application/json" |
| | } |
| | |
| | response = requests.post( |
| | "https://api.segmind.com/v1/sdxl-turbo", |
| | json={ |
| | "prompt": prompt, |
| | "negative_prompt": "blurry, low quality, distorted face, text, watermark", |
| | "samples": 1, |
| | "size": "1024x1024", |
| | "guidance_scale": 1.0 |
| | }, |
| | headers=headers |
| | ) |
| | |
| | if response.status_code == 200: |
| | with open(image_path, "wb") as f: |
| | f.write(response.content) |
| | self.images.append(image_path) |
| | self.log(success(f"Image saved to: {image_path}")) |
| | return image_path |
| | else: |
| | raise Exception(f"Segmind request failed: {response.status_code} {response.text}") |
| | |
| | elif self.image_gen == "pollinations": |
| | self.log("Using Pollinations provider for image generation") |
| | response = requests.get(f"https://image.pollinations.ai/prompt/{prompt}{random.randint(1,10000)}") |
| | |
| | if response.status_code == 200: |
| | self.log("Image received from Pollinations") |
| | with open(image_path, "wb") as f: |
| | f.write(response.content) |
| | self.images.append(image_path) |
| | self.log(success(f"Image saved to: {image_path}")) |
| | return image_path |
| | else: |
| | raise Exception(f"Pollinations request failed with status code: {response.status_code}") |
| | |
| | else: |
| | |
| | error_msg = f"Unsupported image generator: {self.image_gen}" |
| | self.log(error(error_msg)) |
| | raise ValueError(error_msg) |
| |
|
| | def generate_speech(self, text, output_format='mp3') -> str: |
| | """Generate speech from text using the selected TTS engine.""" |
| | self.progress(0.6, desc="Creating voiceover") |
| | self.log("Generating speech from text") |
| | |
| | |
| | text = re.sub(r'[^\w\s.?!,;:\'"-]', '', text) |
| | |
| | self.log(f"Using TTS Engine: {self.tts_engine}, Voice: {self.tts_voice}") |
| | |
| | |
| | if hasattr(self, 'generation_folder') and os.path.exists(self.generation_folder): |
| | audio_path = os.path.join(self.generation_folder, f"speech_{uuid.uuid4()}_{int(time.time())}.{output_format}") |
| | else: |
| | |
| | audio_path = os.path.join(STORAGE_DIR, f"speech_{uuid.uuid4()}_{int(time.time())}.{output_format}") |
| | |
| | if self.tts_engine == "elevenlabs": |
| | self.log("Using ElevenLabs provider for speech generation") |
| | elevenlabs_api_key = os.environ.get("ELEVENLABS_API_KEY", "") |
| | if not elevenlabs_api_key: |
| | raise ValueError("ElevenLabs API key is not set. Please provide a valid API key.") |
| | |
| | headers = { |
| | "Accept": "audio/mpeg", |
| | "Content-Type": "application/json", |
| | "xi-api-key": elevenlabs_api_key |
| | } |
| | |
| | payload = { |
| | "text": text, |
| | "model_id": "eleven_turbo_v2", |
| | "voice_settings": { |
| | "stability": 0.5, |
| | "similarity_boost": 0.5, |
| | "style": 0.0, |
| | "use_speaker_boost": True |
| | }, |
| | "output_format": "mp3_44100_128", |
| | "optimize_streaming_latency": 0 |
| | } |
| | |
| | |
| | voice_id_mapping = { |
| | "Sarah": "21m00Tcm4TlvDq8ikWAM", |
| | "Brian": "hxppwzoRmvxK7YkDrjhQ", |
| | "Lily": "p7TAj7L6QVq1fE6XGyjR", |
| | "Monika Sogam": "Fc3XhIu9tfgOPOsU1hMr", |
| | "George": "o7lPjDgzlF8ZAeSpqmaN", |
| | "River": "f0k5evLkhJxrIRJXQJvy", |
| | "Matilda": "XrExE9yKIg1WjnnlVkGX", |
| | "Will": "pvKWM1B1sNRNTlEYYAEZ", |
| | "Jessica": "A5EAMYWMCSsLNL1wYxOv", |
| | "default": "21m00Tcm4TlvDq8ikWAM" |
| | } |
| | |
| | |
| | voice_id = voice_id_mapping.get(self.tts_voice, self.tts_voice) |
| | |
| | self.log(f"Using ElevenLabs voice: {self.tts_voice} (ID: {voice_id})") |
| | |
| | response = requests.post( |
| | url=f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}", |
| | json=payload, |
| | headers=headers |
| | ) |
| | |
| | if response.status_code == 200: |
| | with open(audio_path, 'wb') as f: |
| | f.write(response.content) |
| | self.log(success(f"Speech generated successfully using ElevenLabs at {audio_path}")) |
| | else: |
| | try: |
| | error_data = response.json() |
| | error_message = error_data.get('detail', {}).get('message', response.text) |
| | error_status = error_data.get('status', 'error') |
| | raise Exception(f"ElevenLabs API error ({response.status_code}, {error_status}): {error_message}") |
| | except ValueError: |
| | |
| | raise Exception(f"ElevenLabs API error ({response.status_code}): {response.text}") |
| | |
| | elif self.tts_engine == "gtts": |
| | self.log("Using Google TTS provider for speech generation") |
| | from gtts import gTTS |
| | tts = gTTS(text=text, lang=self.language[:2].lower(), slow=False) |
| | tts.save(audio_path) |
| | |
| | elif self.tts_engine == "openai": |
| | self.log("Using OpenAI provider for speech generation") |
| | openai_api_key = os.environ.get("OPENAI_API_KEY", "") |
| | if not openai_api_key: |
| | raise ValueError("OpenAI API key is not set. Please provide a valid API key.") |
| | |
| | from openai import OpenAI |
| | client = OpenAI(api_key=openai_api_key) |
| | |
| | voice = self.tts_voice if self.tts_voice else "alloy" |
| | response = client.audio.speech.create( |
| | model="tts-1", |
| | voice=voice, |
| | input=text |
| | ) |
| | response.stream_to_file(audio_path) |
| | |
| | elif self.tts_engine == "edge": |
| | self.log("Using Edge TTS provider for speech generation") |
| | import edge_tts |
| | import asyncio |
| | |
| | voice = self.tts_voice if self.tts_voice else "en-US-AriaNeural" |
| | |
| | async def generate(): |
| | communicate = edge_tts.Communicate(text, voice) |
| | await communicate.save(audio_path) |
| | |
| | asyncio.run(generate()) |
| | |
| | else: |
| | |
| | error_msg = f"Unsupported TTS engine: {self.tts_engine}" |
| | self.log(error(error_msg)) |
| | raise ValueError(error_msg) |
| | |
| | self.log(success(f"Speech generated and saved to: {audio_path}")) |
| | self.tts_path = audio_path |
| | return audio_path |
| |
|
| | def generate_subtitles(self, audio_path: str) -> dict: |
| | """Generate subtitles from audio using AssemblyAI.""" |
| | |
| | if not self.subtitles_enabled: |
| | self.log("Subtitles are disabled, skipping generation") |
| | return { |
| | "wordlevel": [], |
| | "linelevel": [], |
| | "settings": { |
| | "font": self.subtitle_font, |
| | "fontsize": self.font_size, |
| | "color": self.text_color, |
| | "bg_color": self.highlight_color if self.highlighting_enabled else None, |
| | "position": self.subtitle_position, |
| | "highlighting_enabled": self.highlighting_enabled, |
| | "subtitles_enabled": self.subtitles_enabled |
| | } |
| | } |
| | |
| | self.log("Generating subtitles from audio") |
| | try: |
| | import assemblyai as aai |
| | |
| | |
| | aai_api_key = os.environ.get("ASSEMBLYAI_API_KEY", "") |
| | if not aai_api_key: |
| | raise ValueError("AssemblyAI API key is not set. Please provide a valid API key.") |
| | |
| | aai.settings.api_key = aai_api_key |
| | |
| | config = aai.TranscriptionConfig(speaker_labels=False, word_boost=[], format_text=True) |
| | transcriber = aai.Transcriber(config=config) |
| | |
| | self.log("Submitting audio for transcription") |
| | transcript = transcriber.transcribe(audio_path) |
| | |
| | if not transcript or not transcript.words: |
| | raise ValueError("Transcription returned no words.") |
| | |
| | |
| | wordlevel_info = [] |
| | for word in transcript.words: |
| | word_data = { |
| | "word": word.text.strip(), |
| | "start": word.start / 1000.0, |
| | "end": word.end / 1000.0 |
| | } |
| | wordlevel_info.append(word_data) |
| | |
| | self.log(success(f"Transcription successful. Got {len(wordlevel_info)} words.")) |
| | |
| | |
| | |
| | if self.subtitle_font == "random": |
| | FONT = choose_random_font() |
| | self.log(f"Using random font: {FONT}") |
| | else: |
| | FONT = self.subtitle_font |
| | |
| | FONTSIZE = self.font_size |
| | COLOR = self.text_color |
| | BG_COLOR = self.highlight_color if self.highlighting_enabled else None |
| | FRAME_SIZE = (1080, 1920) |
| | |
| | |
| | MAX_CHARS = 30 |
| | MAX_DURATION = 3.0 |
| | MAX_GAP = 1.5 |
| | |
| | |
| | subtitles = [] |
| | line = [] |
| | line_duration = 0 |
| |
|
| | for idx, word_data in enumerate(wordlevel_info): |
| | word = word_data["word"] |
| | start = word_data["start"] |
| | end = word_data["end"] |
| | |
| | line.append(word_data) |
| | line_duration += end - start |
| | |
| | temp = " ".join(item["word"] for item in line) |
| | new_line_chars = len(temp) |
| | |
| | duration_exceeded = line_duration > MAX_DURATION |
| | chars_exceeded = new_line_chars > MAX_CHARS |
| | |
| | if idx > 0: |
| | gap = word_data['start'] - wordlevel_info[idx-1]['end'] |
| | maxgap_exceeded = gap > MAX_GAP |
| | else: |
| | maxgap_exceeded = False |
| |
|
| | if duration_exceeded or chars_exceeded or maxgap_exceeded: |
| | if line: |
| | subtitle_line = { |
| | "text": " ".join(item["word"] for item in line), |
| | "start": line[0]["start"], |
| | "end": line[-1]["end"], |
| | "words": line |
| | } |
| | subtitles.append(subtitle_line) |
| | line = [] |
| | line_duration = 0 |
| |
|
| | |
| | if line: |
| | subtitle_line = { |
| | "text": " ".join(item["word"] for item in line), |
| | "start": line[0]["start"], |
| | "end": line[-1]["end"], |
| | "words": line |
| | } |
| | subtitles.append(subtitle_line) |
| | |
| | self.log(success(f"Generated {len(subtitles)} subtitle lines")) |
| | |
| | |
| | return { |
| | "wordlevel": wordlevel_info, |
| | "linelevel": subtitles, |
| | "settings": { |
| | "font": FONT, |
| | "fontsize": FONTSIZE, |
| | "color": COLOR, |
| | "bg_color": BG_COLOR, |
| | "position": self.subtitle_position, |
| | "highlighting_enabled": self.highlighting_enabled, |
| | "subtitles_enabled": self.subtitles_enabled |
| | } |
| | } |
| | |
| | except Exception as e: |
| | error_msg = f"Error generating subtitles: {str(e)}" |
| | self.log(error(error_msg)) |
| | raise Exception(error_msg) |
| |
|
| | def create_subtitle_clip(self, subtitle_data, frame_size): |
| | """Create subtitle clips for a line of text with word-level highlighting.""" |
| | |
| | if not subtitle_data.get("settings", {}).get("subtitles_enabled", True): |
| | self.log("Subtitles are disabled, skipping subtitle clip creation") |
| | return [] |
| | |
| | settings = subtitle_data["settings"] |
| | font_name = settings["font"] |
| | fontsize = settings["fontsize"] |
| | color = settings["color"] |
| | bg_color = settings["bg_color"] |
| | highlighting_enabled = settings["highlighting_enabled"] |
| | |
| | |
| | try: |
| | font_path = os.path.join(FONTS_DIR, f"{font_name}.ttf") |
| | if os.path.exists(font_path): |
| | pil_font = ImageFont.truetype(font_path, fontsize) |
| | else: |
| | self.log(warning(f"Font {font_name} not found, using default")) |
| | pil_font = ImageFont.load_default() |
| | except Exception as e: |
| | self.log(warning(f"Error loading font: {str(e)}")) |
| | pil_font = ImageFont.load_default() |
| | |
| | |
| | if color.startswith('#'): |
| | text_color_rgb = tuple(int(color.lstrip('#')[i:i+2], 16) for i in (0, 2, 4)) |
| | else: |
| | text_color_rgb = (255, 255, 255) |
| | |
| | if bg_color and bg_color.startswith('#'): |
| | bg_color_rgb = tuple(int(bg_color.lstrip('#')[i:i+2], 16) for i in (0, 2, 4)) |
| | else: |
| | bg_color_rgb = (0, 0, 255) |
| | |
| | |
| | clip_cache = {} |
| | |
| | def create_text_clip(text, bg_color=None, cache_key=None): |
| | |
| | if cache_key and cache_key in clip_cache: |
| | return clip_cache[cache_key] |
| | |
| | try: |
| | |
| | text_width, text_height = pil_font.getbbox(text)[2:4] |
| | |
| | |
| | padding = 10 |
| | img_width = text_width + padding * 2 |
| | img_height = text_height + padding * 2 |
| | |
| | |
| | if bg_color: |
| | img = Image.new('RGB', (img_width, img_height), color=bg_color_rgb) |
| | else: |
| | img = Image.new('RGBA', (img_width, img_height), color=(0, 0, 0, 0)) |
| | |
| | |
| | draw = ImageDraw.Draw(img) |
| | draw.text((padding, padding), text, font=pil_font, fill=text_color_rgb) |
| | |
| | |
| | img_array = np.array(img) |
| | clip = ImageClip(img_array) |
| | |
| | |
| | if cache_key: |
| | clip_cache[cache_key] = (clip, img_width, img_height) |
| | |
| | return clip, img_width, img_height |
| | |
| | except Exception as e: |
| | self.log(warning(f"Error creating text clip: {str(e)}")) |
| | |
| | img = Image.new('RGB', (100, 50), color=(100, 100, 100)) |
| | img_array = np.array(img) |
| | clip = ImageClip(img_array) |
| | return clip, 100, 50 |
| | |
| | subtitle_clips = [] |
| | |
| | |
| | if settings["position"] == "top": |
| | y_buffer = frame_size[1] * 0.1 |
| | elif settings["position"] == "middle": |
| | y_buffer = frame_size[1] * 0.4 |
| | else: |
| | y_buffer = frame_size[1] * 0.7 |
| | |
| | max_width = frame_size[0] * 0.8 |
| | |
| | |
| | word_groups = {} |
| | |
| | |
| | for line_idx, line in enumerate(subtitle_data["linelevel"]): |
| | |
| | line_text = line["text"] |
| | line_start = line["start"] |
| | line_end = line["end"] |
| | line_duration = line_end - line_start |
| | |
| | |
| | lines_data = [] |
| | current_line = [] |
| | current_x = 0 |
| | |
| | for word_data in line["words"]: |
| | word = word_data["word"] |
| | |
| | word_width = pil_font.getbbox(word)[2] + 20 |
| | word_height = pil_font.getbbox(word)[3] + 20 |
| | |
| | |
| | if current_x + word_width > max_width and current_line: |
| | |
| | lines_data.append({ |
| | "words": current_line.copy(), |
| | "total_width": current_x, |
| | "height": max(w["height"] for w in current_line) if current_line else word_height |
| | }) |
| | current_line = [] |
| | current_x = 0 |
| | |
| | |
| | word_info = { |
| | "word": word, |
| | "width": word_width, |
| | "height": word_height, |
| | "start": word_data["start"], |
| | "end": word_data["end"] |
| | } |
| | current_line.append(word_info) |
| | current_x += word_width |
| | |
| | |
| | if current_line: |
| | lines_data.append({ |
| | "words": current_line, |
| | "total_width": current_x, |
| | "height": max(w["height"] for w in current_line) |
| | }) |
| | |
| | |
| | current_y = y_buffer |
| | |
| | for line_data in lines_data: |
| | |
| | line_width = line_data["total_width"] |
| | x_center = (frame_size[0] - line_width) / 2 |
| | |
| | |
| | line_text = " ".join(w["word"] for w in line_data["words"]) |
| | cache_key = f"line_{line_idx}_{line_text}" |
| | line_clip, measured_width, _ = create_text_clip(line_text, None, cache_key) |
| | |
| | |
| | line_clip = line_clip.set_position((x_center, current_y)) |
| | line_clip = line_clip.set_start(line["start"]).set_duration(line_duration) |
| | subtitle_clips.append(line_clip) |
| | |
| | |
| | if highlighting_enabled and bg_color: |
| | current_x = x_center |
| | |
| | |
| | timing_groups = {} |
| | |
| | for word_info in line_data["words"]: |
| | timing_key = f"{word_info['start']:.3f}_{word_info['end']:.3f}" |
| | if timing_key not in timing_groups: |
| | timing_groups[timing_key] = [] |
| | timing_groups[timing_key].append((word_info, current_x)) |
| | current_x += word_info["width"] |
| | |
| | |
| | for timing_key, word_group in timing_groups.items(): |
| | start_time, end_time = map(float, timing_key.split('_')) |
| | |
| | |
| | if len(word_group) == 1: |
| | word_info, x_pos = word_group[0] |
| | word = word_info["word"] |
| | |
| | cache_key = f"word_{word}" |
| | highlight_clip, _, _ = create_text_clip(word, bg_color, cache_key) |
| | highlight_clip = highlight_clip.set_position((x_pos, current_y)) |
| | highlight_clip = highlight_clip.set_start(start_time).set_duration(end_time - start_time) |
| | subtitle_clips.append(highlight_clip) |
| | else: |
| | |
| | |
| | continue_batch = True |
| | batch_start_idx = 0 |
| | |
| | while continue_batch and batch_start_idx < len(word_group): |
| | |
| | batch = [word_group[batch_start_idx]] |
| | batch_x = word_group[batch_start_idx][1] |
| | current_batch_end = batch_start_idx |
| | |
| | |
| | for i in range(batch_start_idx + 1, len(word_group)): |
| | prev_word, prev_x = word_group[i-1] |
| | curr_word, curr_x = word_group[i] |
| | |
| | |
| | if abs(prev_x + prev_word["width"] - curr_x) < 5: |
| | batch.append(word_group[i]) |
| | current_batch_end = i |
| | else: |
| | break |
| | |
| | |
| | if len(batch) > 1: |
| | |
| | batch_text = " ".join(info[0]["word"] for info in batch) |
| | batch_width = batch[-1][1] + batch[-1][0]["width"] - batch[0][1] |
| | |
| | cache_key = f"batch_{batch_text}" |
| | highlight_clip, _, _ = create_text_clip(batch_text, bg_color, cache_key) |
| | highlight_clip = highlight_clip.set_position((batch_x, current_y)) |
| | highlight_clip = highlight_clip.set_start(start_time).set_duration(end_time - start_time) |
| | subtitle_clips.append(highlight_clip) |
| | else: |
| | |
| | word_info, x_pos = batch[0] |
| | word = word_info["word"] |
| | |
| | cache_key = f"word_{word}" |
| | highlight_clip, _, _ = create_text_clip(word, bg_color, cache_key) |
| | highlight_clip = highlight_clip.set_position((x_pos, current_y)) |
| | highlight_clip = highlight_clip.set_start(start_time).set_duration(end_time - start_time) |
| | subtitle_clips.append(highlight_clip) |
| | |
| | |
| | batch_start_idx = current_batch_end + 1 |
| | if batch_start_idx >= len(word_group): |
| | continue_batch = False |
| | |
| | |
| | current_y += line_data["height"] + 10 |
| | |
| | |
| | if len(subtitle_clips) > 200: |
| | self.log(warning(f"Too many subtitle clips ({len(subtitle_clips)}), limiting to 200 for performance")) |
| | subtitle_clips = subtitle_clips[:200] |
| | |
| | self.log(f"Created {len(subtitle_clips)} subtitle clips (optimized)") |
| | return subtitle_clips |
| |
|
| | def combine(self) -> str: |
| | """Combine images, audio, and subtitles into a final video.""" |
| | self.progress(0.8, desc="Creating final video") |
| | self.log("Combining images and audio into final video") |
| | try: |
| | |
| | import tempfile |
| | temp_dir = tempfile.mkdtemp() |
| | |
| | |
| | if hasattr(self, 'generation_folder') and os.path.exists(self.generation_folder): |
| | output_path = os.path.join(self.generation_folder, f"output_{int(time.time())}.mp4") |
| | else: |
| | output_path = os.path.join(STORAGE_DIR, f"output_{int(time.time())}.mp4") |
| | |
| | |
| | if not self.images: |
| | raise ValueError("No images available for video creation") |
| | |
| | if not hasattr(self, 'tts_path') or not self.tts_path or not os.path.exists(self.tts_path): |
| | raise ValueError("No TTS audio file available") |
| | |
| | |
| | tts_clip = AudioFileClip(self.tts_path) |
| | max_duration = tts_clip.duration |
| | |
| | |
| | num_images = len(self.images) |
| | req_dur = max_duration / num_images |
| | |
| | |
| | self.log("Processing images (optimized)") |
| | processed_clips = [] |
| | |
| | for image_path in self.images: |
| | if not os.path.exists(image_path): |
| | self.log(warning(f"Image not found: {image_path}, skipping")) |
| | continue |
| | |
| | try: |
| | |
| | clip = ImageClip(image_path) |
| | |
| | |
| | clip = clip.set_fps(15) |
| | |
| | |
| | aspect_ratio = 9/16 |
| | if clip.w / clip.h < aspect_ratio: |
| | |
| | clip = crop( |
| | clip, |
| | width=clip.w, |
| | height=round(clip.w / aspect_ratio), |
| | x_center=clip.w / 2, |
| | y_center=clip.h / 2 |
| | ) |
| | else: |
| | |
| | clip = crop( |
| | clip, |
| | width=round(aspect_ratio * clip.h), |
| | height=clip.h, |
| | x_center=clip.w / 2, |
| | y_center=clip.h / 2 |
| | ) |
| | |
| | |
| | clip = clip.resize((720, 1280)) |
| | |
| | processed_clips.append(clip) |
| | except Exception as e: |
| | self.log(warning(f"Error processing image {image_path}: {str(e)}")) |
| | |
| | if not processed_clips: |
| | raise ValueError("No valid images could be processed") |
| | |
| | |
| | self.log(f"Creating video sequence from {len(processed_clips)} clips") |
| | final_clips = [] |
| | tot_dur = 0 |
| | |
| | while tot_dur < max_duration: |
| | for base_clip in processed_clips: |
| | duration = min(req_dur, max_duration - tot_dur) |
| | if duration <= 0: |
| | break |
| | |
| | |
| | duration_clip = base_clip.set_duration(duration) |
| | final_clips.append(duration_clip) |
| | tot_dur += duration |
| | |
| | if tot_dur >= max_duration: |
| | break |
| | |
| | |
| | self.log(f"Concatenating {len(final_clips)} clips") |
| | final_clip = concatenate_videoclips(final_clips) |
| | final_clip = final_clip.set_fps(15) |
| | |
| | |
| | final_audio = tts_clip |
| | |
| | |
| | if hasattr(self, 'enable_music') and self.enable_music and self.music_file != "none": |
| | music_path = None |
| | if self.music_file == "random": |
| | music_path = choose_random_music() |
| | elif os.path.exists(os.path.join(MUSIC_DIR, self.music_file)): |
| | music_path = os.path.join(MUSIC_DIR, self.music_file) |
| | |
| | if music_path and os.path.exists(music_path): |
| | self.log(f"Adding background music: {music_path}") |
| | try: |
| | music_clip = AudioFileClip(music_path) |
| | |
| | if music_clip.duration < max_duration: |
| | num_loops = int(np.ceil(max_duration / music_clip.duration)) |
| | music_clip = concatenate_audioclips([music_clip] * num_loops) |
| | |
| | music_clip = music_clip.subclip(0, max_duration) |
| | |
| | music_volume = getattr(self, 'music_volume', 0.1) |
| | music_clip = music_clip.volumex(music_volume) |
| | |
| | final_audio = CompositeAudioClip([tts_clip, music_clip]) |
| | except Exception as e: |
| | self.log(warning(f"Error processing music: {str(e)}")) |
| | |
| | |
| | final_clip = final_clip.set_audio(final_audio) |
| | |
| | |
| | if self.subtitles_enabled and hasattr(self, 'subtitle_data'): |
| | self.log("Adding subtitles (optimized)") |
| | subtitle_clips = self.create_subtitle_clip(self.subtitle_data, (720, 1280)) |
| | if subtitle_clips: |
| | final_clip = CompositeVideoClip([final_clip] + subtitle_clips) |
| | |
| | |
| | self.log("Writing final video file (optimized encoding)") |
| | final_clip.write_videofile( |
| | output_path, |
| | fps=15, |
| | codec="libx264", |
| | audio_codec="aac", |
| | threads=8, |
| | preset="ultrafast", |
| | ffmpeg_params=["-crf", "28"] |
| | ) |
| | |
| | |
| | import shutil |
| | try: |
| | shutil.rmtree(temp_dir, ignore_errors=True) |
| | except Exception: |
| | pass |
| | |
| | self.log(success(f"Video saved to: {output_path}")) |
| | return output_path |
| | |
| | except Exception as e: |
| | error_msg = f"Error combining video: {str(e)}" |
| | self.log(error(error_msg)) |
| | raise Exception(error_msg) |
| |
|
| | def generate_video(self) -> dict: |
| | """Generate complete video with all components.""" |
| | try: |
| | self.log("Starting video generation process") |
| | |
| | |
| | folder_num = 1 |
| | |
| | if os.path.exists(STORAGE_DIR): |
| | existing_folders = [d for d in os.listdir(STORAGE_DIR) if os.path.isdir(os.path.join(STORAGE_DIR, d))] |
| | numbered_folders = [] |
| | for folder in existing_folders: |
| | try: |
| | |
| | if "_" in folder: |
| | num = int(folder.split("_")[0]) |
| | numbered_folders.append(num) |
| | except (ValueError, IndexError): |
| | continue |
| | |
| | if numbered_folders: |
| | folder_num = max(numbered_folders) + 1 |
| | |
| | folder_id = f"{folder_num}_{str(uuid.uuid4())}" |
| | self.generation_folder = os.path.join(STORAGE_DIR, folder_id) |
| | os.makedirs(self.generation_folder, exist_ok=True) |
| | self.log(f"Created generation folder: {self.generation_folder}") |
| | |
| | try: |
| | |
| | self.log("Generating topic") |
| | self.generate_topic() |
| | |
| | |
| | self.progress(0.1, desc="Creating script") |
| | self.log("Generating script") |
| | self.generate_script() |
| | |
| | |
| | self.progress(0.2, desc="Creating metadata") |
| | self.log("Generating metadata") |
| | self.generate_metadata() |
| | |
| | |
| | self.progress(0.3, desc="Creating image prompts") |
| | self.log("Generating image prompts") |
| | self.generate_prompts() |
| | |
| | |
| | self.progress(0.4, desc="Generating images") |
| | self.log("Generating images") |
| | for i, prompt in enumerate(self.image_prompts, 1): |
| | self.progress(0.4 + 0.2 * (i / len(self.image_prompts)), |
| | desc=f"Generating image {i}/{len(self.image_prompts)}") |
| | self.log(f"Generating image {i}/{len(self.image_prompts)}") |
| | self.generate_image(prompt) |
| | |
| | |
| | self.progress(0.6, desc="Creating speech") |
| | self.log("Generating speech") |
| | self.generate_speech(self.script) |
| | |
| | |
| | self.progress(0.7, desc="Generating subtitles") |
| | if self.subtitles_enabled and hasattr(self, 'tts_path') and os.path.exists(self.tts_path): |
| | self.subtitle_data = self.generate_subtitles(self.tts_path) |
| | |
| | if self.subtitle_data: |
| | try: |
| | |
| | if 'wordlevel' in self.subtitle_data: |
| | word_subtitles_path = os.path.join(self.generation_folder, "word_subtitles.json") |
| | with open(word_subtitles_path, 'w') as f: |
| | json.dump(self.subtitle_data['wordlevel'], f, indent=2) |
| | self.log(f"Saved word-level subtitles to: {word_subtitles_path}") |
| | |
| | |
| | if 'linelevel' in self.subtitle_data: |
| | line_subtitles_path = os.path.join(self.generation_folder, "line_subtitles.json") |
| | with open(line_subtitles_path, 'w') as f: |
| | json.dump(self.subtitle_data['linelevel'], f, indent=2) |
| | self.log(f"Saved line-level subtitles to: {line_subtitles_path}") |
| | except Exception as e: |
| | self.log(warning(f"Error saving subtitles to generation folder: {str(e)}")) |
| | |
| | |
| | self.progress(0.75, desc="Saving generation data") |
| | try: |
| | content_path = os.path.join(self.generation_folder, "content.txt") |
| | with open(content_path, 'w', encoding='utf-8') as f: |
| | f.write(f"NICHE: {self.niche}\n\n") |
| | f.write(f"LANGUAGE: {self.language}\n\n") |
| | f.write(f"GENERATED TOPIC: {self.subject}\n\n") |
| | f.write(f"GENERATED SCRIPT:\n{self.script}\n\n") |
| | f.write(f"GENERATED PROMPTS:\n") |
| | for i, prompt in enumerate(self.image_prompts, 1): |
| | f.write(f"{i}. {prompt}\n") |
| | f.write("\n") |
| | f.write(f"GENERATED METADATA:\n") |
| | for key, value in self.metadata.items(): |
| | f.write(f"{key}: {value}\n") |
| | self.log(f"Saved content.txt to: {content_path}") |
| | except Exception as e: |
| | self.log(warning(f"Error saving content.txt: {str(e)}")) |
| | |
| | |
| | self.progress(0.8, desc="Creating final video") |
| | self.log("Combining all elements into final video (optimized rendering)") |
| | |
| | |
| | import gc |
| | gc.collect() |
| | |
| | path = self.combine() |
| | |
| | self.progress(0.95, desc="Finalizing") |
| | self.log(f"Video generation complete. Files saved in: {self.generation_folder}") |
| | |
| | |
| | return { |
| | 'video_path': path, |
| | 'generation_folder': self.generation_folder, |
| | 'title': self.metadata['title'], |
| | 'description': self.metadata['description'], |
| | 'subject': self.subject, |
| | 'script': self.script, |
| | 'logs': self.logs |
| | } |
| | except Exception as e: |
| | error_msg = f"Error during video generation step: {str(e)}" |
| | self.log(error(error_msg)) |
| | |
| | self.cleanup_resources() |
| | raise Exception(error_msg) |
| | |
| | except Exception as e: |
| | error_msg = f"Error during video generation: {str(e)}" |
| | self.log(error(error_msg)) |
| | raise Exception(error_msg) |
| | |
| | def cleanup_resources(self): |
| | """Clean up any resources to prevent memory leaks.""" |
| | try: |
| | |
| | import psutil |
| | for proc in psutil.process_iter(): |
| | try: |
| | |
| | if 'magick' in proc.name().lower() or 'ffmpeg' in proc.name().lower(): |
| | proc.kill() |
| | except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): |
| | pass |
| | |
| | |
| | import gc |
| | gc.collect() |
| | except Exception as e: |
| | self.log(warning(f"Error during resource cleanup: {str(e)}")) |
| | pass |
| |
|
| | |
| | def get_text_generator_models(generator): |
| | """Get available models for the selected text generator.""" |
| | models = { |
| | "gemini": [ |
| | "gemini-2.0-flash", |
| | "gemini-2.0-flash-lite", |
| | "gemini-1.5-flash", |
| | "gemini-1.5-flash-8b", |
| | "gemini-1.5-pro" |
| | ], |
| | "g4f": [ |
| | "gpt-4", |
| | "gpt-4o", |
| | "gpt-3.5-turbo", |
| | "llama-3-70b-chat", |
| | "claude-3-opus-20240229", |
| | "claude-3-sonnet-20240229", |
| | "claude-3-haiku-20240307" |
| | ], |
| | "openai": [ |
| | "gpt-4o", |
| | "gpt-4-turbo", |
| | "gpt-3.5-turbo" |
| | ] |
| | } |
| | return models.get(generator, ["default"]) |
| |
|
| | def get_image_generator_models(generator): |
| | """Get available models for the selected image generator.""" |
| | models = { |
| | "prodia": [ |
| | "sdxl", |
| | "realvisxl", |
| | "juggernaut", |
| | "dreamshaper", |
| | "dalle" |
| | ], |
| | "hercai": [ |
| | "v1", |
| | "v2", |
| | "v3", |
| | "lexica" |
| | ], |
| | "g4f": [ |
| | "flux", |
| | "dall-e-3", |
| | "dall-e-2", |
| | "midjourney" |
| | ], |
| | "segmind": [ |
| | "sdxl-turbo", |
| | "realistic-vision", |
| | "sd3" |
| | ], |
| | "pollinations": [ |
| | "default" |
| | ] |
| | } |
| | return models.get(generator, ["default"]) |
| |
|
| | def get_tts_voices(engine): |
| | """Get available voices for the selected TTS engine.""" |
| | voices = { |
| | "elevenlabs": [ |
| | "Sarah", |
| | "Brian", |
| | "Lily", |
| | "Monika Sogam", |
| | "George", |
| | "River", |
| | "Matilda", |
| | "Will", |
| | "Jessica" |
| | ], |
| | "openai": [ |
| | "alloy", |
| | "echo", |
| | "fable", |
| | "onyx", |
| | "nova", |
| | "shimmer" |
| | ], |
| | "edge": [ |
| | "en-US-AriaNeural", |
| | "en-US-GuyNeural", |
| | "en-GB-SoniaNeural", |
| | "en-AU-NatashaNeural" |
| | ], |
| | "gtts": [ |
| | "en", |
| | "es", |
| | "fr", |
| | "de", |
| | "it", |
| | "pt", |
| | "ru", |
| | "ja", |
| | "zh", |
| | "hi" |
| | ] |
| | } |
| | return voices.get(engine, ["default"]) |
| |
|
| | |
| | def create_interface(): |
| | with gr.Blocks(theme=gr.themes.Soft(primary_hue="indigo", radius_size="lg"), title="YouTube Shorts Generator") as demo: |
| | with gr.Row(): |
| | gr.Markdown( |
| | """ |
| | # 📱 YouTube Shorts Generator |
| | Generate engaging YouTube Shorts videos with AI. Just provide a niche and language to get started! |
| | """ |
| | ) |
| | |
| | with gr.Row(equal_height=True): |
| | |
| | with gr.Column(scale=2, min_width=500): |
| | with gr.Group(): |
| | gr.Markdown("### 📝 Content") |
| | niche = gr.Textbox( |
| | label="Niche/Topic", |
| | placeholder="What's your video about?", |
| | value="Historical Facts" |
| | ) |
| | language = gr.Dropdown( |
| | choices=["English", "Spanish", "French", "German", "Italian", "Portuguese", |
| | "Russian", "Japanese", "Chinese", "Hindi"], |
| | label="Language", |
| | value="English" |
| | ) |
| | |
| | |
| | with gr.Group(): |
| | gr.Markdown("### 🔧 Generator Settings") |
| | with gr.Tabs(): |
| | with gr.TabItem("Text"): |
| | text_gen = gr.Dropdown( |
| | choices=["g4f", "gemini", "openai"], |
| | label="Text Generator", |
| | value="g4f" |
| | ) |
| | text_model = gr.Dropdown( |
| | choices=get_text_generator_models("g4f"), |
| | label="Text Model", |
| | value="gpt-4" |
| | ) |
| | |
| | with gr.TabItem("Image"): |
| | image_gen = gr.Dropdown( |
| | choices=["g4f", "prodia", "hercai", "segmind", "pollinations"], |
| | label="Image Generator", |
| | value="g4f" |
| | ) |
| | image_model = gr.Dropdown( |
| | choices=get_image_generator_models("g4f"), |
| | label="Image Model", |
| | value="flux" |
| | ) |
| | |
| | with gr.TabItem("Speech"): |
| | tts_engine = gr.Dropdown( |
| | choices=["edge", "elevenlabs", "gtts", "openai"], |
| | label="Speech Generator", |
| | value="edge" |
| | ) |
| | tts_voice = gr.Dropdown( |
| | choices=get_tts_voices("edge"), |
| | label="Voice", |
| | value="en-US-AriaNeural" |
| | ) |
| | |
| | with gr.TabItem("Audio"): |
| | enable_music = gr.Checkbox(label="Enable Background Music", value=True) |
| | |
| | music_choices = get_music_files() |
| | default_music = "none" if "random" not in music_choices else "random" |
| | music_file = gr.Dropdown( |
| | choices=music_choices, |
| | label="Background Music", |
| | value=default_music, |
| | interactive=True |
| | ) |
| | music_volume = gr.Slider( |
| | minimum=0.0, |
| | maximum=1.0, |
| | value=0.1, |
| | step=0.05, |
| | label="Background Music Volume" |
| | ) |
| | |
| | with gr.TabItem("Subtitles"): |
| | subtitles_enabled = gr.Checkbox(label="Enable Subtitles", value=True) |
| | highlighting_enabled = gr.Checkbox(label="Enable Word Highlighting", value=True) |
| | subtitle_font = gr.Dropdown( |
| | choices=get_font_files(), |
| | label="Font", |
| | value="random" |
| | ) |
| | with gr.Row(): |
| | font_size = gr.Slider( |
| | minimum=40, |
| | maximum=120, |
| | value=80, |
| | step=5, |
| | label="Font Size" |
| | ) |
| | subtitle_position = gr.Dropdown( |
| | choices=["bottom", "middle", "top"], |
| | label="Position", |
| | value="bottom" |
| | ) |
| | with gr.Row(): |
| | text_color = gr.ColorPicker(label="Text Color", value="#FFFFFF") |
| | highlight_color = gr.ColorPicker(label="Highlight Color", value="#0000FF") |
| | |
| | |
| | generate_btn = gr.Button("🎬 Generate Video", variant="primary", size="lg") |
| | |
| | |
| | with gr.Column(scale=1, min_width=300): |
| | with gr.Tabs(): |
| | with gr.TabItem("Video"): |
| | |
| | video_output = gr.Video(label="Generated Video", height=580, width=330) |
| | |
| | with gr.TabItem("Metadata"): |
| | title_output = gr.Textbox(label="Title", lines=2) |
| | description_output = gr.Textbox(label="Description", lines=4) |
| | script_output = gr.Textbox(label="Script", lines=8) |
| | |
| | |
| | with gr.TabItem("🔑 API Keys"): |
| | gemini_api_key = gr.Textbox( |
| | label="Gemini API Key", |
| | type="password", |
| | value=os.environ.get("GEMINI_API_KEY", "") |
| | ) |
| | assemblyai_api_key = gr.Textbox( |
| | label="AssemblyAI API Key", |
| | type="password", |
| | value=os.environ.get("ASSEMBLYAI_API_KEY", "") |
| | ) |
| | elevenlabs_api_key = gr.Textbox( |
| | label="ElevenLabs API Key", |
| | type="password", |
| | value=os.environ.get("ELEVENLABS_API_KEY", "") |
| | ) |
| | segmind_api_key = gr.Textbox( |
| | label="Segmind API Key", |
| | type="password", |
| | value=os.environ.get("SEGMIND_API_KEY", "") |
| | ) |
| | openai_api_key = gr.Textbox( |
| | label="OpenAI API Key", |
| | type="password", |
| | value=os.environ.get("OPENAI_API_KEY", "") |
| | ) |
| | |
| | with gr.TabItem("Log"): |
| | log_output = gr.Textbox(label="Process Log", lines=15, max_lines=100) |
| | |
| | |
| | def update_text_models(generator): |
| | return gr.Dropdown(choices=get_text_generator_models(generator)) |
| | |
| | def update_image_models(generator): |
| | return gr.Dropdown(choices=get_image_generator_models(generator)) |
| | |
| | def update_tts_voices(engine): |
| | return gr.Dropdown(choices=get_tts_voices(engine)) |
| | |
| | |
| | text_gen.change(fn=update_text_models, inputs=text_gen, outputs=text_model) |
| | image_gen.change(fn=update_image_models, inputs=image_gen, outputs=image_model) |
| | tts_engine.change(fn=update_tts_voices, inputs=tts_engine, outputs=tts_voice) |
| | |
| | |
| | def generate_youtube_short(niche, language, text_gen, text_model, image_gen, image_model, |
| | tts_engine, tts_voice, subtitles_enabled, highlighting_enabled, |
| | subtitle_font, font_size, subtitle_position, |
| | text_color, highlight_color, music_file, |
| | enable_music, music_volume, |
| | gemini_api_key, assemblyai_api_key, |
| | elevenlabs_api_key, segmind_api_key, openai_api_key, |
| | progress=gr.Progress()): |
| | |
| | if not niche.strip(): |
| | return { |
| | video_output: None, |
| | title_output: "ERROR: Please enter a niche/topic", |
| | description_output: "", |
| | script_output: "", |
| | log_output: "Error: Niche/Topic is required. Please enter a valid topic and try again." |
| | } |
| | |
| | |
| | api_keys = { |
| | 'gemini': gemini_api_key, |
| | 'assemblyai': assemblyai_api_key, |
| | 'elevenlabs': elevenlabs_api_key, |
| | 'segmind': segmind_api_key, |
| | 'openai': openai_api_key |
| | } |
| | |
| | try: |
| | |
| | yt = YouTube( |
| | niche=niche, |
| | language=language, |
| | text_gen=text_gen, |
| | text_model=text_model, |
| | image_gen=image_gen, |
| | image_model=image_model, |
| | tts_engine=tts_engine, |
| | tts_voice=tts_voice, |
| | subtitle_font=subtitle_font, |
| | font_size=font_size, |
| | text_color=text_color, |
| | highlight_color=highlight_color, |
| | subtitles_enabled=subtitles_enabled, |
| | highlighting_enabled=highlighting_enabled, |
| | subtitle_position=subtitle_position, |
| | music_file=music_file, |
| | enable_music=enable_music, |
| | music_volume=music_volume, |
| | api_keys=api_keys, |
| | progress=progress |
| | ) |
| | |
| | |
| | result = yt.generate_video() |
| | |
| | |
| | if not result or not result.get('video_path') or not os.path.exists(result.get('video_path', '')): |
| | return { |
| | video_output: None, |
| | title_output: "ERROR: Video generation failed", |
| | description_output: "", |
| | script_output: "", |
| | log_output: "\n".join(yt.logs) |
| | } |
| | |
| | return { |
| | video_output: result['video_path'], |
| | title_output: result['title'], |
| | description_output: result['description'], |
| | script_output: result['script'], |
| | log_output: "\n".join(result['logs']) |
| | } |
| | |
| | except Exception as e: |
| | import traceback |
| | error_details = f"Error: {str(e)}\n\n{traceback.format_exc()}" |
| | return { |
| | video_output: None, |
| | title_output: f"ERROR: {str(e)}", |
| | description_output: "", |
| | script_output: "", |
| | log_output: error_details |
| | } |
| | |
| | |
| | generate_btn.click( |
| | fn=generate_youtube_short, |
| | inputs=[ |
| | niche, language, text_gen, text_model, image_gen, image_model, |
| | tts_engine, tts_voice, subtitles_enabled, highlighting_enabled, |
| | subtitle_font, font_size, subtitle_position, text_color, highlight_color, music_file, |
| | enable_music, music_volume, gemini_api_key, assemblyai_api_key, elevenlabs_api_key, segmind_api_key, openai_api_key |
| | ], |
| | outputs=[video_output, title_output, description_output, script_output, log_output] |
| | ) |
| | |
| | |
| | music_choices = get_music_files() |
| | default_music = "none" if "random" not in music_choices else "random" |
| | |
| | gr.Examples( |
| | [ |
| | ["Historical Facts", "English", "g4f", "gpt-4", "g4f", "flux", "edge", "en-US-AriaNeural", True, True, "default", 80, "bottom", "#FFFFFF", "#0000FF", default_music, True, 0.1], |
| | ["Cooking Tips", "English", "g4f", "gpt-4", "g4f", "flux", "edge", "en-US-AriaNeural", True, True, "default", 80, "bottom", "#FFFFFF", "#FF0000", default_music, True, 0.1], |
| | ["Technology News", "English", "g4f", "gpt-4", "g4f", "flux", "edge", "en-US-GuyNeural", True, True, "default", 80, "bottom", "#FFFFFF", "#00FF00", default_music, True, 0.1], |
| | ], |
| | [niche, language, text_gen, text_model, image_gen, image_model, tts_engine, tts_voice, |
| | subtitles_enabled, highlighting_enabled, subtitle_font, font_size, |
| | subtitle_position, text_color, highlight_color, music_file, enable_music, music_volume], |
| | label="Quick Start Templates" |
| | ) |
| | |
| | return demo |
| |
|
| | |
| | if __name__ == "__main__": |
| | |
| | os.makedirs(STATIC_DIR, exist_ok=True) |
| | os.makedirs(MUSIC_DIR, exist_ok=True) |
| | os.makedirs(FONTS_DIR, exist_ok=True) |
| | os.makedirs(STORAGE_DIR, exist_ok=True) |
| | |
| | |
| | demo = create_interface() |
| | demo.launch() |