Spaces:
Paused
Paused
| import os | |
| import praw | |
| import time | |
| import logging | |
| import gradio as gr | |
| from dotenv import load_dotenv | |
| import re | |
| from pydub import AudioSegment | |
| import asyncio | |
| import tempfile | |
| import edge_tts | |
| import random | |
| import assemblyai as aai | |
| from moviepy.config import change_settings | |
| from moviepy.editor import * | |
| from moviepy.editor import TextClip, CompositeVideoClip, AudioFileClip, ColorClip | |
| from PIL import Image, ImageDraw, ImageFont | |
| import numpy as np | |
| from huggingface_hub import HfApi, login | |
| import requests | |
| # Initialize logger | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(message)s", | |
| handlers=[ | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| os.system("apt-get update && apt-get install -y fonts-dejavu") | |
| FONT_PATH = "MouldyCheeseRegular-WyMWG.ttf" | |
| # Verify font file existence | |
| if not os.path.exists(FONT_PATH): | |
| raise FileNotFoundError(f"Font file not found: {FONT_PATH}") | |
| print(f"Using font at: {FONT_PATH}") | |
| # Logger setup | |
| logger = logging.getLogger("reddit_audio") | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| change_settings({"IMAGEMAGICK_BINARY": None}) # Disable ImageMagick | |
| # Load environment variables (Hugging Face Secrets) | |
| load_dotenv() | |
| REDDIT_CLIENT_ID = os.getenv("REDDIT_CLIENT_ID") | |
| REDDIT_CLIENT_SECRET = os.getenv("REDDIT_CLIENT_SECRET") | |
| REDDIT_USER_AGENT = os.getenv("REDDIT_USER_AGENT") | |
| PEXELS_API_KEY = os.getenv("PEXELS_API_KEY") | |
| # Initialize Reddit client | |
| try: | |
| reddit = praw.Reddit( | |
| client_id=REDDIT_CLIENT_ID, | |
| client_secret=REDDIT_CLIENT_SECRET, | |
| user_agent=REDDIT_USER_AGENT, | |
| ) | |
| # Test the connection | |
| reddit.user.me() | |
| print(reddit.user.me()) | |
| except Exception as e: | |
| logger.error(f"Failed to initialize Reddit client: {e}") | |
| logger.error("Please check your Reddit API credentials in the .env file") | |
| logger.error("Required environment variables: REDDIT_CLIENT_ID, REDDIT_CLIENT_SECRET, REDDIT_USER_AGENT") | |
| # Voice options with descriptions | |
| VOICE_OPTIONS = { | |
| # American English Voices | |
| "en-US-GuyNeural": "Male (American)", | |
| "en-US-JennyNeural": "Female (American)", | |
| "en-US-AriaNeural": "Female (American)", | |
| # British English Voices | |
| "en-GB-RyanNeural": "Male (British)", | |
| "en-GB-SoniaNeural": "Female (British)", | |
| "en-GB-LibbyNeural": "Female (British)", | |
| "en-GB-AlfieNeural": "Male (British)", | |
| "en-GB-ElliotNeural": "Male (British)", | |
| # Australian English Voices | |
| "en-AU-NatashaNeural": "Female (Australian)", | |
| "en-AU-WilliamNeural": "Male (Australian)", | |
| "en-AU-AnnetteNeural": "Female (Australian)", | |
| "en-AU-CarlyNeural": "Female (Australian)", | |
| "en-AU-DuncanNeural": "Male (Australian)", | |
| # Indian English Voices | |
| "en-IN-NeerjaNeural": "Female (Indian)", | |
| "en-IN-PrabhatNeural": "Male (Indian)", | |
| # Irish English Voice | |
| "en-IE-ConnorNeural": "Male (Irish)", | |
| "en-IE-EmilyNeural": "Female (Irish)", | |
| # Canadian English Voices | |
| "en-CA-ClaraNeural": "Female (Canadian)", | |
| "en-CA-LiamNeural": "Male (Canadian)" | |
| } | |
| # Background video options | |
| BACKGROUND_OPTIONS = { | |
| "Green": "Solid green background", | |
| "Black": "Solid black background", | |
| "Minecraft": "Minecraft gameplay", | |
| "Cake Making": "Oddly Satisfying Cake Making", | |
| "Satisfying ART": "Satisfying Art background", | |
| "Pexels": "Use stock videos based on keywords from Pexels" | |
| } | |
| # Directory to save audio files | |
| os.makedirs("audio_outputs", exist_ok=True) | |
| HF_TOKEN = os.getenv("HF_TOKEN") # Fetch the token from Hugging Face Secrets | |
| login(HF_TOKEN) | |
| # Initialize Hugging Face API | |
| hf_api = HfApi() | |
| def clean_text(text): | |
| """Remove emojis and unsupported characters from text.""" | |
| text = re.sub(r'[\U00010000-\U0010FFFF]+', '', text) # Remove emojis | |
| text = re.sub(r'[^\w\s.,!?\'"-]', '', text) # Remove unsupported characters | |
| return text | |
| def load_nsfw_words(file_path="nsfw_words.txt"): | |
| """Load NSFW words from a file.""" | |
| if not os.path.exists(file_path): | |
| logger.warning(f"NSFW words file not found: {file_path}") | |
| return [] | |
| with open(file_path, "r") as f: | |
| return [line.strip().lower() for line in f if line.strip()] | |
| # Load NSFW words dynamically | |
| NSFW_WORDS = load_nsfw_words() | |
| def filter_nsfw_words(text): | |
| """Replace NSFW words with [beep]""" | |
| if not text: | |
| return text | |
| text_lower = text.lower() | |
| result = text | |
| for word in NSFW_WORDS: | |
| if word in text_lower: | |
| # Find the actual word with original case | |
| start = text_lower.find(word) | |
| while start != -1: | |
| end = start + len(word) | |
| # Replace only if it's a whole word | |
| if (start == 0 or not text_lower[start-1].isalnum()) and \ | |
| (end == len(text_lower) or not text_lower[end].isalnum()): | |
| result = result[:start] + "[beep]" + result[end:] | |
| text_lower = text_lower[:start] + "[beep]" + text_lower[end:] | |
| start = text_lower.find(word, start + 1) | |
| return result | |
| def contains_nsfw_words(text): | |
| """Check if text contains NSFW words.""" | |
| text_lower = text.lower() | |
| for word in NSFW_WORDS: | |
| if word in text_lower: | |
| return True | |
| return False | |
| # Word Count logic for story input | |
| def count_words(text): | |
| text = text or "" | |
| word_count = len(text.strip().split()) | |
| return f"{word_count} words" | |
| def fetch_top_post_and_comments(subreddit_url, filter_type="hot", time_filter="day", max_duration=45, min_duration=30, max_retries=10): | |
| """Fetch the top post and comments from a subreddit URL with strict duration enforcement.""" | |
| try: | |
| # Extract subreddit name | |
| subreddit_name = subreddit_url.rstrip("/").split("/")[-1] | |
| subreddit = reddit.subreddit(subreddit_name) | |
| logger.info(f"Fetching posts from subreddit: {subreddit_name}") | |
| # Fetch posts based on filter type | |
| posts = list(subreddit.hot(limit=20) if filter_type == "hot" else subreddit.top(limit=20, time_filter=time_filter)) | |
| retries = 0 | |
| while retries < max_retries: | |
| retries += 1 | |
| logger.info(f"Attempt {retries}/{max_retries} to find suitable post") | |
| # Filter only NSFW-marked posts | |
| suitable_posts = [post for post in posts if not post.over_18] | |
| if not suitable_posts: | |
| raise ValueError("No suitable posts found.") | |
| # Randomly select a post | |
| top_post = random.choice(suitable_posts) | |
| posts.remove(top_post) # Remove from pool | |
| # Clean and filter title and selftext | |
| title = clean_text(top_post.title) | |
| title = filter_nsfw_words(title) | |
| selftext = clean_text(top_post.selftext) | |
| selftext = filter_nsfw_words(selftext) | |
| post_content = f"{title}" | |
| if selftext: | |
| post_content += f". {selftext}" | |
| total_duration = estimate_audio_duration(post_content) | |
| logger.info(f"Post content duration: {total_duration}s") | |
| if total_duration > max_duration: | |
| logger.debug(f"Post content too long ({total_duration}s), trying another") | |
| continue | |
| # Collect comments while staying within duration limits | |
| top_comments = [] | |
| top_post.comments.replace_more(limit=0) | |
| for comment in top_post.comments: | |
| if isinstance(comment, praw.models.Comment): | |
| comment_text = clean_text(comment.body) | |
| # Skip deleted comments | |
| if comment_text.lower() in ["deleted", "[deleted]"]: | |
| continue | |
| # Filter NSFW words | |
| comment_text = filter_nsfw_words(comment_text) | |
| comment_duration = estimate_audio_duration(comment_text) | |
| logger.debug(f"Comment duration: {comment_duration}s") | |
| if total_duration + comment_duration > max_duration: | |
| break | |
| top_comments.append(comment_text) | |
| total_duration += comment_duration | |
| # Verify final duration | |
| if min_duration <= total_duration <= max_duration and top_comments: | |
| logger.info(f"Found suitable content with {len(top_comments)} comments") | |
| return post_content, top_comments | |
| logger.warning(f"Content duration ({total_duration}s) outside acceptable range or no valid comments") | |
| raise ValueError("Unable to find suitable post within retry limit.") | |
| except praw.exceptions.PRAWException as e: | |
| logger.error(f"Reddit API error: {e}") | |
| raise ValueError(f"Failed to fetch Reddit content: {e}") | |
| except Exception as e: | |
| logger.error(f"Unexpected error fetching content: {e}") | |
| raise ValueError(f"Failed to fetch Reddit content: {e}") | |
| def fetch_post_and_comments_from_url(post_url, max_duration=45, min_duration=30): | |
| """Fetch a specific Reddit post and its comments using the post URL.""" | |
| reddit = praw.Reddit( | |
| client_id=REDDIT_CLIENT_ID, | |
| client_secret=REDDIT_CLIENT_SECRET, | |
| user_agent=REDDIT_USER_AGENT, | |
| ) | |
| # Fetch the submission (post) using the URL | |
| submission = reddit.submission(url=post_url) | |
| logger.info(f"Fetching post from URL: {post_url}") | |
| if submission.over_18: | |
| logger.warning("Post is marked as NSFW") | |
| raise ValueError("The post is marked as NSFW and cannot be processed.") | |
| # Clean and filter title and selftext | |
| title = clean_text(submission.title) | |
| title = filter_nsfw_words(title) | |
| selftext = clean_text(submission.selftext) | |
| selftext = filter_nsfw_words(selftext) | |
| post_content = f"{title}" | |
| if selftext: | |
| post_content += f". {selftext}" | |
| total_duration = estimate_audio_duration(post_content) | |
| logger.info(f"Post content duration: {total_duration}s") | |
| # If post content alone exceeds max duration, truncate it | |
| if total_duration > max_duration: | |
| logger.warning(f"Post content too long ({total_duration}s > {max_duration}s)") | |
| raise ValueError(f"Post content alone exceeds maximum duration ({total_duration}s > {max_duration}s)") | |
| # Fetch comments while respecting duration constraints | |
| top_comments = [] | |
| submission.comments.replace_more(limit=0) | |
| # Keep collecting comments until we hit minimum duration or run out of comments | |
| for comment in submission.comments: | |
| if isinstance(comment, praw.models.Comment): | |
| comment_text = clean_text(comment.body) | |
| # Skip deleted comments | |
| if comment_text.lower() in ["deleted", "[deleted]"]: | |
| continue | |
| # Filter NSFW words | |
| comment_text = filter_nsfw_words(comment_text) | |
| comment_duration = estimate_audio_duration(comment_text) | |
| logger.debug(f"Potential comment duration: {comment_duration}s") | |
| # Check if adding this comment would exceed max duration | |
| if total_duration + comment_duration > max_duration: | |
| if total_duration >= min_duration: | |
| logger.info(f"Reached sufficient duration ({total_duration}s), skipping remaining comments") | |
| break | |
| else: | |
| logger.debug(f"Comment would exceed max duration ({total_duration + comment_duration}s > {max_duration}s), but haven't reached min duration yet. Looking for shorter comments...") | |
| continue | |
| top_comments.append(comment_text) | |
| total_duration += comment_duration | |
| logger.debug(f"Added comment. New total duration: {total_duration}s") | |
| # If we've reached minimum duration, we can stop | |
| if total_duration >= min_duration: | |
| logger.info(f"Reached minimum duration ({total_duration}s)") | |
| break | |
| # Now check if we have enough content | |
| if total_duration < min_duration: | |
| logger.warning(f"Content too short ({total_duration}s < {min_duration}s)") | |
| raise ValueError(f"Content is too short ({total_duration}s) to generate audio in the desired duration range ({min_duration}s-{max_duration}s).") | |
| if not top_comments: | |
| logger.warning("No valid comments found") | |
| raise ValueError("No valid comments found in the post.") | |
| logger.info(f"Successfully fetched post with {len(top_comments)} comments. Total duration: {total_duration}s") | |
| return post_content, top_comments | |
| def create_video_from_story(story_text, selected_voice, rate, pitch, background, pexels_keywords=None): | |
| cleaned_story = clean_text(story_text) | |
| cleaned_story = filter_nsfw_words(cleaned_story) | |
| audio_path = asyncio.run(text_to_speech(cleaned_story, voice=selected_voice, rate=rate, pitch=pitch)) | |
| if not audio_path: | |
| return None, None, "Failed to generate audio from story" | |
| subtitles = generate_subtitles(audio_path) | |
| if subtitles is None: | |
| return None, None, "Failed to generate subtitles" | |
| timestamp = int(time.time()) | |
| output_path = f"/tmp/story_{timestamp}.mp4" | |
| video_path, video_url = create_video_with_background( | |
| audio_path=audio_path, | |
| subtitles=subtitles, | |
| subreddit_url="story", | |
| selected_font="Mouldy Cheese", | |
| background=background, | |
| output_path=output_path, | |
| pexels_keywords=pexels_keywords | |
| ) | |
| if not video_path: | |
| return None, None, "Failed to create video" | |
| try: | |
| if os.path.exists(audio_path): | |
| os.remove(audio_path) | |
| except Exception as e: | |
| logger.warning(f"Failed to clean up audio: {e}") | |
| return video_path, video_url, "Video generated successfully from story!" | |
| def estimate_audio_duration(text, words_per_second=3.5, pause_per_sentence=1.0, pause_per_comment=1.5): | |
| """Estimate the duration of the audio based on text length and pauses.""" | |
| word_count = len(text.split()) | |
| sentence_count = text.count('.') + text.count('!') + text.count('?') | |
| comment_count = text.count('. Comments: ') # Count comment transitions | |
| duration = (word_count / words_per_second) + (sentence_count * pause_per_sentence) + (comment_count * pause_per_comment) | |
| return duration | |
| async def text_to_speech(text, voice="en-US-GuyNeural - Male (American)", rate=0, pitch=0): | |
| """Convert text to speech using edge-tts.""" | |
| try: | |
| # Extract voice ID from the display string (e.g., "en-US-GuyNeural - Male (American)" -> "en-US-GuyNeural") | |
| voice_id = voice.split(" - ")[0] if " - " in voice else voice | |
| # Format rate and pitch as required by edge-tts | |
| rate_str = f"{rate:+d}%" | |
| pitch_str = f"{pitch:+d}Hz" | |
| # Create output directory if it doesn't exist | |
| os.makedirs("temp", exist_ok=True) | |
| # Generate unique filename | |
| timestamp = int(time.time()) | |
| output_file = f"temp/speech_{timestamp}.mp3" | |
| logger.info(f"Generating TTS with voice: {voice_id}, rate: {rate_str}, pitch: {pitch_str}") | |
| # Configure voice settings | |
| communicate = edge_tts.Communicate(text, voice_id, rate=rate_str, volume='+0%', pitch=pitch_str) | |
| # Convert to audio | |
| await communicate.save(output_file) | |
| if os.path.exists(output_file): | |
| logger.info(f"Successfully generated audio file: {output_file}") | |
| return output_file | |
| else: | |
| logger.error("Failed to generate audio file") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error in text_to_speech: {e}") | |
| return None | |
| def generate_subtitles(audio_path): | |
| try: | |
| if not audio_path or not os.path.exists(audio_path): | |
| logger.error(f"Invalid audio path: {audio_path}") | |
| return None | |
| # Initialize AssemblyAI | |
| aai.settings.api_key = os.getenv("ASSEMBLYAI_API_KEY") | |
| if not aai.settings.api_key: | |
| logger.error("AssemblyAI API key not found") | |
| return None | |
| transcriber = aai.Transcriber() | |
| logger.info(f"Uploading audio file: {audio_path}") | |
| with open(audio_path, "rb") as audio_file: | |
| transcript = transcriber.transcribe(audio_file) | |
| transcript.wait_for_completion() | |
| words = transcript.words | |
| if not words: | |
| logger.error("No words found in transcript") | |
| return None | |
| logger.info(f"Received {len(words)} words from transcription") | |
| def create_text_image(current_word, context_words, size=(1000, 150)): | |
| # Create a new image with an RGBA mode for transparency | |
| image = Image.new('RGBA', size, (0, 0, 0, 0)) # Fully transparent background | |
| draw = ImageDraw.Draw(image) | |
| try: | |
| font = ImageFont.truetype(FONT_PATH, 72) | |
| except: | |
| logger.warning(f"Failed to load font from {FONT_PATH}, using default") | |
| font = ImageFont.load_default() | |
| # Join all words with spaces, but add markers around the current word | |
| text_parts = [] | |
| for word in context_words: | |
| if word == current_word: | |
| text_parts.append(f"[{word}]") # Mark current word | |
| else: | |
| text_parts.append(word) | |
| full_text = " ".join(text_parts) | |
| # Get text bounding box | |
| bbox = draw.textbbox((0, 0), full_text, font=font) | |
| text_width = bbox[2] - bbox[0] | |
| text_height = bbox[3] - bbox[1] | |
| # Calculate position to center the text | |
| x = (size[0] - text_width) // 2 | |
| y = (size[1] - text_height) // 2 | |
| # Draw each word with appropriate color | |
| current_x = x | |
| for word in context_words: | |
| word_with_space = word + " " | |
| word_bbox = draw.textbbox((0, 0), word_with_space, font=font) | |
| word_width = word_bbox[2] - word_bbox[0] | |
| # Highlight current word in yellow, others in white | |
| color = (255, 255, 0, 255) if word == current_word else (255, 255, 255, 255) # Full opacity for text | |
| draw.text((current_x, y), word_with_space, font=font, fill=color) | |
| current_x += word_width | |
| # Convert RGBA to RGB array with alpha channel | |
| img_array = np.array(image) | |
| return img_array | |
| subtitles = [] | |
| window_size = 4 # Number of words to show at once | |
| for i, current_word in enumerate(words): | |
| try: | |
| # Get context words (previous and next words) | |
| start_idx = max(0, i - window_size // 2) | |
| end_idx = min(len(words), i + window_size // 2 + 1) | |
| context_words = [w.text for w in words[start_idx:end_idx]] | |
| # Create image with text | |
| img_array = create_text_image(current_word.text, context_words) | |
| # Convert to ImageClip | |
| clip = ImageClip(img_array) | |
| # Set timing | |
| start_time = float(current_word.start) / 1000 | |
| duration = float(current_word.end - current_word.start) / 1000 | |
| # Position and time the clip | |
| clip = (clip | |
| .set_position(('center', 'center')) | |
| .set_start(start_time) | |
| .set_duration(duration)) | |
| subtitles.append(clip) | |
| logger.info(f"Successfully created clip for word: {current_word.text}") | |
| except Exception as clip_error: | |
| logger.error(f"Failed to create clip for word '{current_word.text}'. Error: {clip_error}") | |
| continue | |
| if not subtitles: | |
| logger.error("No valid subtitle clips were created") | |
| return None | |
| logger.info(f"Successfully generated {len(subtitles)} subtitle clips") | |
| return subtitles | |
| except Exception as e: | |
| logger.error(f"Error generating subtitles: {e}") | |
| logger.exception("Full traceback:") | |
| return None | |
| def is_post_url(url): | |
| """Check if the URL is a direct post URL.""" | |
| if not url: | |
| return False | |
| return bool(re.match(r'https?://(?:www\.)?reddit\.com/r/\w+/comments/\w+/?', url)) | |
| def generate_audio_from_reddit(url, filter_type, time_filter, selected_voice, rate, pitch): | |
| try: | |
| # Get content based on URL type | |
| try: | |
| if is_post_url(url): | |
| logger.info("Processing direct post URL...") | |
| post_content, comments = fetch_post_and_comments_from_url(url) | |
| else: | |
| logger.info("Processing subreddit URL...") | |
| post_content, comments = fetch_top_post_and_comments(url, filter_type, time_filter) | |
| except ValueError as e: | |
| logger.error(f"Error fetching content: {e}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Unexpected error fetching content: {e}") | |
| return None | |
| if not post_content or not comments: | |
| logger.error("Failed to get Reddit content") | |
| return None | |
| # Combine content | |
| combined_content = f"{post_content}. Hey, Listen:" | |
| for idx, comment in enumerate(comments, start=1): | |
| combined_content += f"{comment}. " | |
| # Generate audio | |
| audio_path = asyncio.run(text_to_speech(combined_content, voice=selected_voice, rate=rate, pitch=pitch)) | |
| if not audio_path: | |
| logger.error("Failed to generate audio") | |
| return None | |
| logger.info(f"Successfully generated audio at: {audio_path}") | |
| return audio_path | |
| except Exception as e: | |
| logger.error(f"Error in generate_audio_from_reddit: {e}") | |
| return None | |
| def search_pexels_video(query, page=1, per_page=5): | |
| if not PEXELS_API_KEY: | |
| logger.error("PEXELS_API_KEY is missing from environment variables") | |
| return None | |
| # Use only the first keyword (before comma) | |
| # query = keywords.split(",")[0].strip() # This was already handled, query is now a single keyword | |
| logger.info(f"Searching Pexels for: '{query}', page: {page}, per_page: {per_page}") | |
| headers = {"Authorization": PEXELS_API_KEY} | |
| params = {"query": query, "per_page": per_page, "page": page} | |
| response = requests.get("https://api.pexels.com/videos/search", headers=headers, params=params) | |
| logger.debug(f"Pexels API response status: {response.status_code} for query '{query}' page {page}") | |
| if response.status_code != 200: | |
| logger.error(f"Pexels API error: {response.text}") | |
| return None | |
| data = response.json() | |
| logger.info(f"Found {len(data.get('videos', []))} videos for query: {query}") | |
| videos_on_page = data.get("videos", []) | |
| if not videos_on_page: | |
| logger.warning(f"No videos found on page {page} for query: {query}") | |
| return None | |
| # To introduce more variety, we can pick a random video from the current page's results | |
| # instead of always the "best" or first one. | |
| selected_video_data = random.choice(videos_on_page) | |
| video_files = selected_video_data.get("video_files", []) | |
| if not video_files: | |
| logger.warning(f"Selected video (ID: {selected_video_data.get('id')}) has no video_files.") | |
| return None | |
| # Filter for minimum resolution (optional) and then pick highest available for that video | |
| filtered_files = [f for f in video_files if f["width"] >= 720] # Example: min width 720p | |
| files_to_consider = filtered_files or video_files # Fallback to all files if none meet filter | |
| if not files_to_consider: | |
| logger.warning(f"No suitable video files after filtering for video ID: {selected_video_data.get('id')}") | |
| return None | |
| # Pick the highest resolution available from the considered files | |
| sorted_files = sorted(files_to_consider, key=lambda f: f["width"] * f["height"], reverse=True) | |
| best_file = sorted_files[0] | |
| logger.info(f"Selected video file: {best_file['link']} ({best_file['width']}x{best_file['height']}) for query '{query}' page {page}") | |
| return best_file["link"] | |
| def _fetch_and_process_single_pexels_video(keyword_query, page_number, target_resolution=(1080, 1920)): | |
| """ | |
| Fetches a single video for a keyword, processes it (resize/crop), | |
| and returns the VideoFileClip object and its temporary path. | |
| The temporary file is NOT deleted by this function. | |
| """ | |
| url = search_pexels_video(keyword_query, page=page_number) | |
| if not url: | |
| logger.warning(f"No Pexels URL found for '{keyword_query}'.") | |
| return None, None # Indicate failure | |
| # Create a unique temporary file path | |
| temp_dir = "/tmp" | |
| os.makedirs(temp_dir, exist_ok=True) # Ensure /tmp exists | |
| temp_fd, temp_path = tempfile.mkstemp(suffix=".mp4", dir=temp_dir, prefix="pexels_") | |
| os.close(temp_fd) # Close the file descriptor, we just need the path | |
| video_clip_obj = None | |
| try: | |
| logger.info(f"Downloading Pexels clip from {url} to {temp_path}") | |
| with requests.get(url, stream=True, timeout=30) as r: # Added timeout | |
| r.raise_for_status() # Check for HTTP errors | |
| with open(temp_path, "wb") as f: | |
| for chunk in r.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| logger.info(f"Pexels clip downloaded to {temp_path}") | |
| if not os.path.exists(temp_path) or os.path.getsize(temp_path) == 0: | |
| logger.error(f"Downloaded Pexels clip is missing or empty: {temp_path}") | |
| if os.path.exists(temp_path): os.remove(temp_path) # Clean up failed download | |
| return None, None | |
| video_clip_obj = VideoFileClip(temp_path) | |
| # If the fetched Pexels video is longer than 3 seconds, subclip it to 3 seconds. | |
| if video_clip_obj.duration > 3.0: | |
| logger.info(f"Original Pexels clip for '{keyword_query}' is {video_clip_obj.duration:.2f}s long. Subclipping to 3.0s.") | |
| active_segment = video_clip_obj.subclip(0, 3.0) | |
| else: | |
| active_segment = video_clip_obj # Use the full clip if it's <= 3s | |
| # Resize and crop | |
| clip_aspect_ratio = active_segment.w / active_segment.h | |
| target_aspect_ratio = target_resolution[0] / target_resolution[1] | |
| if clip_aspect_ratio > target_aspect_ratio: | |
| resized_segment = active_segment.resize(height=target_resolution[1]) | |
| else: | |
| resized_segment = active_segment.resize(width=target_resolution[0]) | |
| final_processed_segment = resized_segment.crop( | |
| x_center=resized_segment.w / 2, # Use .w and .h of the *resized* segment | |
| y_center=resized_segment.h / 2, | |
| width=target_resolution[0], | |
| height=target_resolution[1] | |
| ) | |
| # DO NOT CLOSE video_clip_obj here if processed_clip is derived from it and shares the reader. | |
| # If processed_clip is a new object with its own reader after crop/resize, then original can be closed. | |
| # MoviePy operations like resize/crop usually create new clip instances that might share the reader or copy frames. | |
| # It's safer to let the caller manage the lifecycle of the returned clip and its source file. | |
| logger.info(f"Pexels clip for '{keyword_query}' downloaded and processed. Path: {temp_path}, Final Segment Duration: {final_processed_segment.duration:.2f}s") | |
| return final_processed_segment, temp_path # Return the processed clip and its temp path | |
| except requests.exceptions.RequestException as e: | |
| logger.error(f"Failed to download Pexels video for '{keyword_query}': {e}") | |
| if os.path.exists(temp_path): os.remove(temp_path) | |
| return None, None | |
| except Exception as e: | |
| logger.error(f"Failed to process Pexels video: {e}") | |
| if video_clip_obj: video_clip_obj.close() # Close if an error occurred after opening | |
| if os.path.exists(temp_path): os.remove(temp_path) # Clean up on error | |
| return None, None | |
| def get_video_clip(background_type, duration, target_resolution=(1080, 1920), pexels_keywords=None): | |
| """Get a video clip with random start time and specified duration.""" | |
| logger.info(f"[DEBUG] background_type: '{background_type}', pexels_keywords: '{pexels_keywords}'") | |
| try: | |
| if background_type == "Pexels": | |
| if not pexels_keywords or not isinstance(pexels_keywords, list) or not any(kw.strip() for kw in pexels_keywords): | |
| logger.warning("No valid Pexels keywords provided or list is empty. Using solid green background.") | |
| return ColorClip(size=target_resolution, color=(0, 255, 0)).set_duration(duration), [] | |
| collected_clips = [] | |
| temp_files_to_delete_later = [] | |
| current_total_duration = 0.0 | |
| # --- New Pexels clip collection logic for grouping and duration fill --- | |
| # The pexels_keywords list is already stripped and filtered in tts_interface | |
| # before being passed to create_video_from_story or create_video_with_background. | |
| # So, pexels_keywords here should be a clean list of non-empty strings. | |
| # Cycle through keywords, fetching one clip per keyword per cycle, until duration is met. | |
| keyword_page_trackers = {kw: 1 for kw in pexels_keywords} # Track current page for each keyword | |
| MAX_FAILURES_PER_KEYWORD_TOTAL = 3 # Max total failures (no clip from any page) for a keyword before skipping it | |
| keyword_failure_counts = {kw: 0 for kw in pexels_keywords} | |
| active_keywords = list(pexels_keywords) # Keywords we are still trying to get clips from | |
| while current_total_duration < duration and any(active_keywords): | |
| keyword_processed_in_cycle = False | |
| for keyword_idx, keyword in enumerate(list(active_keywords)): # Iterate over a copy for safe removal | |
| if current_total_duration >= duration: | |
| break | |
| if keyword_failure_counts[keyword] >= MAX_FAILURES_PER_KEYWORD_TOTAL: | |
| if keyword in active_keywords: active_keywords.remove(keyword) # Stop trying this keyword | |
| continue | |
| current_page = keyword_page_trackers[keyword] | |
| logger.info(f"Trying keyword '{keyword}', page {current_page}. Total duration: {current_total_duration:.2f}s / {duration:.2f}s") | |
| clip_segment, temp_file_path = _fetch_and_process_single_pexels_video(keyword, current_page, target_resolution) | |
| if clip_segment and temp_file_path: | |
| collected_clips.append(clip_segment) | |
| temp_files_to_delete_later.append(temp_file_path) | |
| current_total_duration += clip_segment.duration | |
| keyword_page_trackers[keyword] += 1 # Move to next page for this keyword on its next turn | |
| keyword_failure_counts[keyword] = 0 # Reset failure count on success | |
| keyword_processed_in_cycle = True | |
| logger.info(f"Added clip for '{keyword}' (page {current_page}). Segment: {clip_segment.duration:.2f}s. Total: {current_total_duration:.2f}s") | |
| else: | |
| logger.warning(f"No clip found for '{keyword}' on page {current_page}.") | |
| keyword_failure_counts[keyword] += 1 | |
| keyword_page_trackers[keyword] += 1 # Still try next page next time | |
| if keyword_failure_counts[keyword] >= MAX_FAILURES_PER_KEYWORD_TOTAL: | |
| logger.info(f"Max total failures reached for keyword '{keyword}'. Removing from active list.") | |
| if keyword in active_keywords: active_keywords.remove(keyword) | |
| if not keyword_processed_in_cycle and any(active_keywords): # If a full cycle through active keywords yields nothing | |
| logger.info("A full cycle through active keywords yielded no new clips. Stopping Pexels search.") | |
| break | |
| # --- End of Pexels clip collection logic --- | |
| if not collected_clips: | |
| logger.warning("No Pexels clips were collected. Using solid green background.") | |
| return ColorClip(size=target_resolution, color=(0, 255, 0)).set_duration(duration), [] | |
| # Concatenate all collected clips | |
| logger.info(f"Concatenating {len(collected_clips)} Pexels clips.") | |
| final_pexels_video = concatenate_videoclips(collected_clips, method="compose") | |
| if final_pexels_video.duration == 0: # Should not happen if collected_clips is not empty | |
| logger.error("Concatenated Pexels video has zero duration. This should not happen if clips were collected. Using solid green.") | |
| for p_clip in collected_clips: p_clip.close() # Close individual segments | |
| for f_path in temp_files_to_delete_later: # Delete their temp files | |
| if os.path.exists(f_path): os.remove(f_path) | |
| return ColorClip(size=target_resolution, color=(0, 255, 0)).set_duration(duration), [] | |
| if final_pexels_video.duration > duration: | |
| logger.info(f"Subclipping concatenated Pexels video (duration {final_pexels_video.duration:.2f}s) to target duration ({duration:.2f}s)") | |
| final_pexels_video = final_pexels_video.subclip(0, duration) | |
| else: | |
| logger.info(f"Collected Pexels video duration {final_pexels_video.duration:.2f}s. Target audio duration is {duration:.2f}s. Video will be adjusted to audio duration during final composition if shorter.") | |
| return final_pexels_video, temp_files_to_delete_later | |
| # --- Handling for other background types (Minecraft, Cake Making, etc.) --- | |
| # Define video paths relative to the script location | |
| script_dir = os.path.dirname(os.path.abspath(__file__)) | |
| video_paths = { | |
| "Minecraft": os.path.join(script_dir, "Minecraft.mp4"), | |
| "Cake Making": os.path.join(script_dir, "A Collection OF CAKE Oddly Satisfying Chocolate Cake You Never Seen _ Awesome Cake Decorating Ideas.mp4"), | |
| "Satisfying ART": os.path.join(script_dir, "TOP 80 Satisfying Art Videos _ Best of The Year Quantastic.mp4"), | |
| } | |
| # Handle solid color backgrounds | |
| if background_type == "Black": | |
| logger.info("Creating solid black background") | |
| return ColorClip(size=target_resolution, color=(0, 0, 0)).set_duration(duration), [] | |
| elif background_type == "Green": | |
| logger.info("Creating solid green background") | |
| return ColorClip(size=target_resolution, color=(0, 255, 0)).set_duration(duration), [] | |
| # Handle predefined video backgrounds | |
| if background_type not in video_paths: | |
| logger.error(f"Invalid background type: {background_type}") | |
| logger.info(f"Available backgrounds: {list(video_paths.keys())}") | |
| return ColorClip(size=target_resolution, color=(0, 255, 0)).set_duration(duration), [] # Default to green | |
| video_path = video_paths[background_type] | |
| if not os.path.exists(video_path): | |
| logger.error(f"Background video not found: {video_path}") | |
| logger.info(f"Looking in directory: {script_dir}") | |
| logger.info(f"Available files: {os.listdir(script_dir)}") | |
| return ColorClip(size=target_resolution, color=(0, 255, 0)).set_duration(duration), [] | |
| logger.info(f"Loading background video: {video_path}") | |
| video = VideoFileClip(video_path) # This clip needs to be closed later | |
| max_start = max(0, video.duration - duration) | |
| start_time = random.uniform(0, max_start) if max_start > 0 else 0 | |
| logger.info(f"Selected start time: {start_time:.2f}s") | |
| clip = video.subclip(start_time, start_time + duration) | |
| target_aspect = target_resolution[0] / target_resolution[1] | |
| clip_aspect = clip.w / clip.h | |
| if clip_aspect > target_aspect: | |
| new_width = int(clip.h * target_aspect) | |
| x_center = clip.w // 2; x1 = x_center - (new_width // 2); x2 = x_center + (new_width // 2) | |
| clip = clip.crop(x1=x1, x2=x2) | |
| else: | |
| new_height = int(clip.w / target_aspect) | |
| y_center = clip.h // 2; y1 = y_center - (new_height // 2); y2 = y_center + (new_height // 2) | |
| clip = clip.crop(y1=y1, y2=y2) | |
| clip = clip.resize(target_resolution) | |
| if clip.duration < duration: | |
| clip = clip.loop(duration=duration) | |
| # For non-Pexels, the original 'video' object needs to be closed if 'clip' is derived. | |
| # However, 'clip' itself is what's returned. If 'clip' is a subclip, it shares the reader. | |
| # It's safer to close 'video' after 'clip' is fully processed by the caller. | |
| # For simplicity here, we assume 'clip' is self-contained enough or its reader is managed by MoviePy. | |
| # A more robust solution would track 'video' for later closure. | |
| # For now, returning just the clip and an empty list for temp files. | |
| # The 'video' object will be closed by Python's GC if not explicitly closed, but explicit is better. | |
| # This part needs careful thought on resource management if we were to return 'video' for closure. | |
| # Since we return 'clip', and 'video' is local, it should be fine. | |
| # The main issue was with Pexels temp files. | |
| logger.info("Background video clip created successfully") | |
| return clip, [] # Return clip and empty list for temp_files | |
| except Exception as e: | |
| logger.error(f"Error creating video background: {e}") | |
| logger.exception("Full traceback:") | |
| return ColorClip(size=target_resolution, color=(0, 255, 0)).set_duration(duration), [] | |
| def create_video_with_background(audio_path, subtitles, subreddit_url, selected_font="Mouldy Cheese", background="Green", output_path="/tmp/output_video.mp4", pexels_keywords=None): | |
| logger.info(f"[DEBUG] create_video_with_background received pexels_keywords: '{pexels_keywords}'") | |
| try: | |
| logger.info("Starting video creation process...") | |
| # Validate audio path | |
| if not isinstance(audio_path, str) or not os.path.exists(audio_path): | |
| logger.error(f"Invalid audio path: {audio_path}") | |
| return None, None | |
| # Validate subtitles | |
| if not isinstance(subtitles, list): | |
| logger.error(f"Invalid subtitles format: {type(subtitles)}") | |
| return None, None | |
| logger.info(f"Loading audio file: {audio_path}") | |
| try: | |
| # Create audio clip with error handling | |
| audio = AudioFileClip(audio_path) | |
| duration = audio.duration | |
| logger.info(f"Audio duration: {duration} seconds") | |
| except Exception as e: | |
| logger.error(f"Error loading audio file: {e}") | |
| return None, None | |
| try: | |
| # Create background clip | |
| logger.info(f"Creating background clip with {background} background...") | |
| # get_video_clip now returns a tuple (clip, temp_files_list) for Pexels | |
| result_bg = get_video_clip(background, duration, target_resolution=(1080,1920), pexels_keywords=pexels_keywords) | |
| if isinstance(result_bg, tuple): | |
| background_clip, pexels_temp_files_to_clean = result_bg | |
| else: # For non-Pexels backgrounds that return only the clip | |
| background_clip = result_bg | |
| # pexels_temp_files_to_clean remains empty | |
| if background_clip is None: | |
| logger.error("Failed to create background clip") | |
| return None, None | |
| logger.info("Background clip created successfully") | |
| except Exception as e: | |
| logger.error(f"Error creating background clip: {e}") | |
| return None, None | |
| try: | |
| # Combine background with subtitles | |
| logger.info(f"Combining {len(subtitles)} subtitle clips with background...") | |
| final_clips = [background_clip] + subtitles | |
| video = CompositeVideoClip(final_clips, size=(1080, 1920)) | |
| logger.info("Clips combined successfully") | |
| except Exception as e: | |
| logger.error(f"Error combining clips: {e}") | |
| return None, None | |
| try: | |
| # Set the audio | |
| logger.info("Setting audio to video...") | |
| video = video.set_audio(audio) | |
| logger.info("Audio set successfully") | |
| except Exception as e: | |
| logger.error(f"Error setting audio: {e}") | |
| return None, None | |
| try: | |
| # Write the result to a file in /tmp/ | |
| logger.info(f"Writing video to file: {output_path}") | |
| video.write_videofile( | |
| output_path, | |
| fps=30, | |
| codec='libx264', | |
| audio_codec='aac', | |
| temp_audiofile='temp-audio.m4a', | |
| remove_temp=True, | |
| logger=None | |
| ) | |
| logger.info("Video written successfully") | |
| except Exception as e: | |
| logger.error(f"Error writing video file: {e}") | |
| logger.exception("Full traceback:") | |
| return None, None | |
| finally: | |
| # Clean up | |
| try: | |
| video.close() | |
| audio.close() | |
| background_clip.close() | |
| # Explicitly close individual Pexels segments if background_clip is a CompositeVideoClip from Pexels | |
| if background == "Pexels" and hasattr(background_clip, 'clips') and background_clip.clips: | |
| # The 'clips' attribute of a CompositeVideoClip holds the list of original clips. | |
| # These are the ones that were in 'collected_clips' in get_video_clip. | |
| logger.info(f"Attempting to close {len(background_clip.clips)} Pexels sub-clips.") | |
| for pexels_segment_clip in background_clip.clips: | |
| if pexels_segment_clip: # Check if the clip object itself is not None | |
| pexels_segment_clip.close() | |
| for clip in subtitles: | |
| clip.close() | |
| except Exception as e: | |
| logger.warning(f"Error during cleanup: {e}") | |
| finally: # Ensure Pexels temp files are cleaned up | |
| for temp_f_path in pexels_temp_files_to_clean: | |
| if os.path.exists(temp_f_path): | |
| try: | |
| os.remove(temp_f_path) | |
| logger.info(f"Cleaned up Pexels temp file: {temp_f_path}") | |
| except Exception as e_del: | |
| logger.warning(f"Error deleting Pexels temp file {temp_f_path}: {e_del}") | |
| if os.path.exists(output_path): | |
| logger.info(f"Video successfully created at: {output_path}") | |
| # Upload to Hugging Face Dataset | |
| hf_api = HfApi() | |
| dataset_repo = "lolhaha002/redditbotdata" # Change this to your dataset name | |
| logger.info(f"Uploading video to Hugging Face Dataset: {output_path}") | |
| video_filename = os.path.basename(output_path) | |
| hf_api.upload_file( | |
| path_or_fileobj=output_path, | |
| path_in_repo=f"videos/{video_filename}", | |
| repo_id=dataset_repo, | |
| repo_type="dataset" | |
| ) | |
| # Generate public URL | |
| video_url = f"https://huggingface.co/datasets/{dataset_repo}/resolve/main/videos/{video_filename}" | |
| logger.info(f"Video uploaded successfully: {video_url}") | |
| return output_path, video_url | |
| else: | |
| logger.error("Video file not found after creation") | |
| return None, None | |
| except Exception as e: | |
| logger.error(f"Unexpected error in video creation: {e}") | |
| logger.exception("Full traceback:") | |
| # Cleanup Pexels temp files even on outer exception | |
| for temp_f_path in pexels_temp_files_to_clean: | |
| if os.path.exists(temp_f_path): | |
| try: | |
| os.remove(temp_f_path) | |
| logger.info(f"Cleaned up Pexels temp file on error: {temp_f_path}") | |
| except Exception as e_del: | |
| logger.warning(f"Error deleting Pexels temp file {temp_f_path} on error: {e_del}") | |
| return None, None | |
| def tts_interface(subreddit_url, story_text, filter_type, time_filter, selected_voice, rate, pitch, background, pexels_keywords): | |
| logger.info(f"[DEBUG] tts_interface received pexels_keywords: '{pexels_keywords}'") | |
| try: | |
| logger.info("Starting TTS interface process...") | |
| logger.info(f"Selected background: {background}") | |
| # Story Logic | |
| story_text = (story_text or "").strip() | |
| subreddit_url = (subreddit_url or "").strip() | |
| if not story_text and not subreddit_url: | |
| return None, None, "Please provide either a story or a Reddit URL." | |
| if story_text: | |
| # Parse pexels_keywords if provided for story mode | |
| if isinstance(pexels_keywords, str): | |
| keywords_list_story = [kw.strip() for kw in pexels_keywords.split(",") if kw.strip()] | |
| else: | |
| keywords_list_story = [] | |
| logger.info(f"[DEBUG] Parsed keywords list for story: {keywords_list_story}") | |
| return create_video_from_story(story_text, selected_voice, rate, pitch, background, pexels_keywords=keywords_list_story) | |
| # Generate audio | |
| logger.info("Generating audio from Reddit content...") | |
| audio_path = generate_audio_from_reddit(subreddit_url, filter_type, time_filter, selected_voice, rate, pitch) | |
| if not audio_path or not isinstance(audio_path, str): | |
| logger.error(f"Invalid audio path returned: {audio_path}") | |
| return None, None, "Failed to generate audio: Content not suitable or contains NSFW material" | |
| logger.info(f"Audio generated successfully: {audio_path}") | |
| # Generate subtitles | |
| logger.info(f"Generating subtitles") | |
| subtitles = generate_subtitles(audio_path) | |
| if subtitles is None: | |
| logger.error("Failed to generate subtitles") | |
| return None, None, "Failed to generate subtitles" | |
| logger.info(f"Generated {len(subtitles)} subtitle clips") | |
| # Create unique output path | |
| timestamp = int(time.time()) | |
| if subreddit_url and "reddit.com/r/" in subreddit_url: | |
| subreddit_name = subreddit_url.split("reddit.com/r/")[-1].split("/")[0] | |
| else: | |
| subreddit_name = "unknown" | |
| video_filename = f"{subreddit_name}_{timestamp}.mp4" | |
| output_path = f"/tmp/{video_filename}" | |
| logger.info(f"Creating video with output path: {output_path}") | |
| # Create video | |
| logger.info(f"[DEBUG] Raw pexels_keywords input: '{pexels_keywords}'") | |
| if isinstance(pexels_keywords, str): | |
| keywords_list = [kw.strip() for kw in pexels_keywords.split(",") if kw.strip()] | |
| else: | |
| keywords_list = [] | |
| logger.info(f"[DEBUG] Parsed keywords list: {keywords_list}") | |
| if not keywords_list: | |
| logger.warning("No valid Pexels keywords found after parsing.") | |
| else: | |
| for i, kw in enumerate(keywords_list, start=1): | |
| logger.info(f"[DEBUG] Keyword {i}: '{kw}'") | |
| video_path, video_url = create_video_with_background( | |
| audio_path=audio_path, | |
| subtitles=subtitles, | |
| subreddit_url=subreddit_url, | |
| selected_font="Mouldy Cheese", | |
| background=background, | |
| output_path=output_path, | |
| pexels_keywords=keywords_list | |
| ) | |
| if video_path is None: | |
| logger.error("Failed to create video") | |
| return None, None, "Failed to create video" | |
| logger.info(f"Video created at: {video_path}") | |
| logger.info(f"Video URL: {video_url}") | |
| # Clean up audio file only, keep the video file for preview | |
| try: | |
| if os.path.exists(audio_path): | |
| os.remove(audio_path) | |
| logger.info(f"Cleaned up temporary audio file: {audio_path}") | |
| except Exception as e: | |
| logger.warning(f"Failed to clean up temporary files: {e}") | |
| logger.info("Video generation process completed successfully") | |
| return video_path, video_url, "Video generated and uploaded successfully!" | |
| except Exception as e: | |
| logger.error(f"Error in TTS interface: {e}") | |
| logger.exception("Full traceback:") | |
| return None, None, f"Error: {str(e)}" | |
| if __name__ == "__main__": | |
| with gr.Blocks() as demo: | |
| gr.Markdown(""" | |
| # Reddit to Video Generator | |
| Enter either: | |
| - A subreddit URL (e.g., https://www.reddit.com/r/AskReddit/) to get top posts | |
| - A direct post URL (e.g., https://www.reddit.com/r/AskReddit/comments/abc123/post_title/) to use that specific post | |
| If you are looking for a bulk shorts creator and/or wants to provide a support, please checkout my fiverr gigs https://www.fiverr.com/s/dDdbGXZ | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| url_input = gr.Textbox( | |
| label="Reddit URL", | |
| placeholder="Enter subreddit URL or direct post URL" | |
| ) | |
| story_input = gr.Textbox( | |
| label="Or Enter Your Own Story", | |
| placeholder="Paste or write your story here (no Reddit needed)", | |
| lines=6 | |
| ) | |
| word_count_display = gr.Textbox(label="Word Count", interactive=False) | |
| story_input.change( | |
| fn=count_words, | |
| inputs=story_input, | |
| outputs=word_count_display | |
| ) | |
| filter_type = gr.Dropdown( | |
| ["hot", "top", "new"], | |
| label="Filter Type (for subreddit URLs only)", | |
| value="hot" | |
| ) | |
| time_filter = gr.Dropdown( | |
| ["hour", "day", "week", "month", "year", "all"], | |
| label="Time Filter (for subreddit URLs only)", | |
| value="day" | |
| ) | |
| selected_voice = gr.Dropdown( | |
| choices=[f"{k} - {v}" for k, v in VOICE_OPTIONS.items()], | |
| value="en-US-GuyNeural - Male (American)", | |
| label="Voice" | |
| ) | |
| rate = gr.Slider( | |
| minimum=-100, | |
| maximum=100, | |
| value=0, | |
| step=10, | |
| label="Voice Speed" | |
| ) | |
| pitch = gr.Slider( | |
| minimum=-100, | |
| maximum=100, | |
| value=0, | |
| step=10, | |
| label="Voice Pitch" | |
| ) | |
| background = gr.Dropdown( | |
| list(BACKGROUND_OPTIONS.keys()), | |
| label="Background", | |
| value="Green" # Changed default to Green | |
| ) | |
| pexels_keywords = gr.Textbox( | |
| label="Pexels Keywords (for video background)", | |
| placeholder="e.g., nature, city night, forest" | |
| ) | |
| with gr.Column(): | |
| video_preview = gr.Video(label="Video Preview") | |
| dataset_url = gr.Textbox(label="Dataset URL (Click to View)") | |
| status_text = gr.Textbox(label="Status") | |
| submit_btn = gr.Button("Generate Video") | |
| # The pexels_keywords Textbox is already defined within the first gr.Column. | |
| # The print statement below and the second definition were redundant. | |
| submit_btn.click( | |
| fn=tts_interface, | |
| inputs=[url_input, story_input, filter_type, time_filter, selected_voice, rate, pitch, background, pexels_keywords], | |
| outputs=[video_preview, dataset_url, status_text], | |
| queue=True | |
| ) | |
| def debug_pexels_input(text): | |
| logger.debug(f"[DEBUGUI] Received Pexels keywords via UI event: '{text}'") | |
| return None # Explicitly return None or omit return | |
| demo.launch(share=True) |