import os import praw import time import logging import gradio as gr from dotenv import load_dotenv import re from pydub import AudioSegment import asyncio import tempfile import edge_tts import random import assemblyai as aai from moviepy.config import change_settings from moviepy.editor import * from moviepy.editor import TextClip, CompositeVideoClip, AudioFileClip, ColorClip from PIL import Image, ImageDraw, ImageFont import numpy as np from huggingface_hub import HfApi, login import requests # Initialize logger logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.StreamHandler() ] ) logger = logging.getLogger(__name__) os.system("apt-get update && apt-get install -y fonts-dejavu") FONT_PATH = "MouldyCheeseRegular-WyMWG.ttf" # Verify font file existence if not os.path.exists(FONT_PATH): raise FileNotFoundError(f"Font file not found: {FONT_PATH}") print(f"Using font at: {FONT_PATH}") # Logger setup logger = logging.getLogger("reddit_audio") logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) change_settings({"IMAGEMAGICK_BINARY": None}) # Disable ImageMagick # Load environment variables (Hugging Face Secrets) load_dotenv() REDDIT_CLIENT_ID = os.getenv("REDDIT_CLIENT_ID") REDDIT_CLIENT_SECRET = os.getenv("REDDIT_CLIENT_SECRET") REDDIT_USER_AGENT = os.getenv("REDDIT_USER_AGENT") PEXELS_API_KEY = os.getenv("PEXELS_API_KEY") # Initialize Reddit client try: reddit = praw.Reddit( client_id=REDDIT_CLIENT_ID, client_secret=REDDIT_CLIENT_SECRET, user_agent=REDDIT_USER_AGENT, ) # Test the connection reddit.user.me() print(reddit.user.me()) except Exception as e: logger.error(f"Failed to initialize Reddit client: {e}") logger.error("Please check your Reddit API credentials in the .env file") logger.error("Required environment variables: REDDIT_CLIENT_ID, REDDIT_CLIENT_SECRET, REDDIT_USER_AGENT") # Voice options with descriptions VOICE_OPTIONS = { # American English Voices "en-US-GuyNeural": "Male (American)", "en-US-JennyNeural": "Female (American)", "en-US-AriaNeural": "Female (American)", # British English Voices "en-GB-RyanNeural": "Male (British)", "en-GB-SoniaNeural": "Female (British)", "en-GB-LibbyNeural": "Female (British)", "en-GB-AlfieNeural": "Male (British)", "en-GB-ElliotNeural": "Male (British)", # Australian English Voices "en-AU-NatashaNeural": "Female (Australian)", "en-AU-WilliamNeural": "Male (Australian)", "en-AU-AnnetteNeural": "Female (Australian)", "en-AU-CarlyNeural": "Female (Australian)", "en-AU-DuncanNeural": "Male (Australian)", # Indian English Voices "en-IN-NeerjaNeural": "Female (Indian)", "en-IN-PrabhatNeural": "Male (Indian)", # Irish English Voice "en-IE-ConnorNeural": "Male (Irish)", "en-IE-EmilyNeural": "Female (Irish)", # Canadian English Voices "en-CA-ClaraNeural": "Female (Canadian)", "en-CA-LiamNeural": "Male (Canadian)" } # Background video options BACKGROUND_OPTIONS = { "Green": "Solid green background", "Black": "Solid black background", "Minecraft": "Minecraft gameplay", "Cake Making": "Oddly Satisfying Cake Making", "Satisfying ART": "Satisfying Art background", "Pexels": "Use stock videos based on keywords from Pexels" } # Directory to save audio files os.makedirs("audio_outputs", exist_ok=True) HF_TOKEN = os.getenv("HF_TOKEN") # Fetch the token from Hugging Face Secrets login(HF_TOKEN) # Initialize Hugging Face API hf_api = HfApi() def clean_text(text): """Remove emojis and unsupported characters from text.""" text = re.sub(r'[\U00010000-\U0010FFFF]+', '', text) # Remove emojis text = re.sub(r'[^\w\s.,!?\'"-]', '', text) # Remove unsupported characters return text def load_nsfw_words(file_path="nsfw_words.txt"): """Load NSFW words from a file.""" if not os.path.exists(file_path): logger.warning(f"NSFW words file not found: {file_path}") return [] with open(file_path, "r") as f: return [line.strip().lower() for line in f if line.strip()] # Load NSFW words dynamically NSFW_WORDS = load_nsfw_words() def filter_nsfw_words(text): """Replace NSFW words with [beep]""" if not text: return text text_lower = text.lower() result = text for word in NSFW_WORDS: if word in text_lower: # Find the actual word with original case start = text_lower.find(word) while start != -1: end = start + len(word) # Replace only if it's a whole word if (start == 0 or not text_lower[start-1].isalnum()) and \ (end == len(text_lower) or not text_lower[end].isalnum()): result = result[:start] + "[beep]" + result[end:] text_lower = text_lower[:start] + "[beep]" + text_lower[end:] start = text_lower.find(word, start + 1) return result def contains_nsfw_words(text): """Check if text contains NSFW words.""" text_lower = text.lower() for word in NSFW_WORDS: if word in text_lower: return True return False # Word Count logic for story input def count_words(text): text = text or "" word_count = len(text.strip().split()) return f"{word_count} words" def fetch_top_post_and_comments(subreddit_url, filter_type="hot", time_filter="day", max_duration=45, min_duration=30, max_retries=10): """Fetch the top post and comments from a subreddit URL with strict duration enforcement.""" try: # Extract subreddit name subreddit_name = subreddit_url.rstrip("/").split("/")[-1] subreddit = reddit.subreddit(subreddit_name) logger.info(f"Fetching posts from subreddit: {subreddit_name}") # Fetch posts based on filter type posts = list(subreddit.hot(limit=20) if filter_type == "hot" else subreddit.top(limit=20, time_filter=time_filter)) retries = 0 while retries < max_retries: retries += 1 logger.info(f"Attempt {retries}/{max_retries} to find suitable post") # Filter only NSFW-marked posts suitable_posts = [post for post in posts if not post.over_18] if not suitable_posts: raise ValueError("No suitable posts found.") # Randomly select a post top_post = random.choice(suitable_posts) posts.remove(top_post) # Remove from pool # Clean and filter title and selftext title = clean_text(top_post.title) title = filter_nsfw_words(title) selftext = clean_text(top_post.selftext) selftext = filter_nsfw_words(selftext) post_content = f"{title}" if selftext: post_content += f". {selftext}" total_duration = estimate_audio_duration(post_content) logger.info(f"Post content duration: {total_duration}s") if total_duration > max_duration: logger.debug(f"Post content too long ({total_duration}s), trying another") continue # Collect comments while staying within duration limits top_comments = [] top_post.comments.replace_more(limit=0) for comment in top_post.comments: if isinstance(comment, praw.models.Comment): comment_text = clean_text(comment.body) # Skip deleted comments if comment_text.lower() in ["deleted", "[deleted]"]: continue # Filter NSFW words comment_text = filter_nsfw_words(comment_text) comment_duration = estimate_audio_duration(comment_text) logger.debug(f"Comment duration: {comment_duration}s") if total_duration + comment_duration > max_duration: break top_comments.append(comment_text) total_duration += comment_duration # Verify final duration if min_duration <= total_duration <= max_duration and top_comments: logger.info(f"Found suitable content with {len(top_comments)} comments") return post_content, top_comments logger.warning(f"Content duration ({total_duration}s) outside acceptable range or no valid comments") raise ValueError("Unable to find suitable post within retry limit.") except praw.exceptions.PRAWException as e: logger.error(f"Reddit API error: {e}") raise ValueError(f"Failed to fetch Reddit content: {e}") except Exception as e: logger.error(f"Unexpected error fetching content: {e}") raise ValueError(f"Failed to fetch Reddit content: {e}") def fetch_post_and_comments_from_url(post_url, max_duration=45, min_duration=30): """Fetch a specific Reddit post and its comments using the post URL.""" reddit = praw.Reddit( client_id=REDDIT_CLIENT_ID, client_secret=REDDIT_CLIENT_SECRET, user_agent=REDDIT_USER_AGENT, ) # Fetch the submission (post) using the URL submission = reddit.submission(url=post_url) logger.info(f"Fetching post from URL: {post_url}") if submission.over_18: logger.warning("Post is marked as NSFW") raise ValueError("The post is marked as NSFW and cannot be processed.") # Clean and filter title and selftext title = clean_text(submission.title) title = filter_nsfw_words(title) selftext = clean_text(submission.selftext) selftext = filter_nsfw_words(selftext) post_content = f"{title}" if selftext: post_content += f". {selftext}" total_duration = estimate_audio_duration(post_content) logger.info(f"Post content duration: {total_duration}s") # If post content alone exceeds max duration, truncate it if total_duration > max_duration: logger.warning(f"Post content too long ({total_duration}s > {max_duration}s)") raise ValueError(f"Post content alone exceeds maximum duration ({total_duration}s > {max_duration}s)") # Fetch comments while respecting duration constraints top_comments = [] submission.comments.replace_more(limit=0) # Keep collecting comments until we hit minimum duration or run out of comments for comment in submission.comments: if isinstance(comment, praw.models.Comment): comment_text = clean_text(comment.body) # Skip deleted comments if comment_text.lower() in ["deleted", "[deleted]"]: continue # Filter NSFW words comment_text = filter_nsfw_words(comment_text) comment_duration = estimate_audio_duration(comment_text) logger.debug(f"Potential comment duration: {comment_duration}s") # Check if adding this comment would exceed max duration if total_duration + comment_duration > max_duration: if total_duration >= min_duration: logger.info(f"Reached sufficient duration ({total_duration}s), skipping remaining comments") break else: logger.debug(f"Comment would exceed max duration ({total_duration + comment_duration}s > {max_duration}s), but haven't reached min duration yet. Looking for shorter comments...") continue top_comments.append(comment_text) total_duration += comment_duration logger.debug(f"Added comment. New total duration: {total_duration}s") # If we've reached minimum duration, we can stop if total_duration >= min_duration: logger.info(f"Reached minimum duration ({total_duration}s)") break # Now check if we have enough content if total_duration < min_duration: logger.warning(f"Content too short ({total_duration}s < {min_duration}s)") raise ValueError(f"Content is too short ({total_duration}s) to generate audio in the desired duration range ({min_duration}s-{max_duration}s).") if not top_comments: logger.warning("No valid comments found") raise ValueError("No valid comments found in the post.") logger.info(f"Successfully fetched post with {len(top_comments)} comments. Total duration: {total_duration}s") return post_content, top_comments def create_video_from_story(story_text, selected_voice, rate, pitch, background, pexels_keywords=None): cleaned_story = clean_text(story_text) cleaned_story = filter_nsfw_words(cleaned_story) audio_path = asyncio.run(text_to_speech(cleaned_story, voice=selected_voice, rate=rate, pitch=pitch)) if not audio_path: return None, None, "Failed to generate audio from story" subtitles = generate_subtitles(audio_path) if subtitles is None: return None, None, "Failed to generate subtitles" timestamp = int(time.time()) output_path = f"/tmp/story_{timestamp}.mp4" video_path, video_url = create_video_with_background( audio_path=audio_path, subtitles=subtitles, subreddit_url="story", selected_font="Mouldy Cheese", background=background, output_path=output_path, pexels_keywords=pexels_keywords ) if not video_path: return None, None, "Failed to create video" try: if os.path.exists(audio_path): os.remove(audio_path) except Exception as e: logger.warning(f"Failed to clean up audio: {e}") return video_path, video_url, "Video generated successfully from story!" def estimate_audio_duration(text, words_per_second=3.5, pause_per_sentence=1.0, pause_per_comment=1.5): """Estimate the duration of the audio based on text length and pauses.""" word_count = len(text.split()) sentence_count = text.count('.') + text.count('!') + text.count('?') comment_count = text.count('. Comments: ') # Count comment transitions duration = (word_count / words_per_second) + (sentence_count * pause_per_sentence) + (comment_count * pause_per_comment) return duration async def text_to_speech(text, voice="en-US-GuyNeural - Male (American)", rate=0, pitch=0): """Convert text to speech using edge-tts.""" try: # Extract voice ID from the display string (e.g., "en-US-GuyNeural - Male (American)" -> "en-US-GuyNeural") voice_id = voice.split(" - ")[0] if " - " in voice else voice # Format rate and pitch as required by edge-tts rate_str = f"{rate:+d}%" pitch_str = f"{pitch:+d}Hz" # Create output directory if it doesn't exist os.makedirs("temp", exist_ok=True) # Generate unique filename timestamp = int(time.time()) output_file = f"temp/speech_{timestamp}.mp3" logger.info(f"Generating TTS with voice: {voice_id}, rate: {rate_str}, pitch: {pitch_str}") # Configure voice settings communicate = edge_tts.Communicate(text, voice_id, rate=rate_str, volume='+0%', pitch=pitch_str) # Convert to audio await communicate.save(output_file) if os.path.exists(output_file): logger.info(f"Successfully generated audio file: {output_file}") return output_file else: logger.error("Failed to generate audio file") return None except Exception as e: logger.error(f"Error in text_to_speech: {e}") return None def generate_subtitles(audio_path): try: if not audio_path or not os.path.exists(audio_path): logger.error(f"Invalid audio path: {audio_path}") return None # Initialize AssemblyAI aai.settings.api_key = os.getenv("ASSEMBLYAI_API_KEY") if not aai.settings.api_key: logger.error("AssemblyAI API key not found") return None transcriber = aai.Transcriber() logger.info(f"Uploading audio file: {audio_path}") with open(audio_path, "rb") as audio_file: transcript = transcriber.transcribe(audio_file) transcript.wait_for_completion() words = transcript.words if not words: logger.error("No words found in transcript") return None logger.info(f"Received {len(words)} words from transcription") def create_text_image(current_word, context_words, size=(1000, 150)): # Create a new image with an RGBA mode for transparency image = Image.new('RGBA', size, (0, 0, 0, 0)) # Fully transparent background draw = ImageDraw.Draw(image) try: font = ImageFont.truetype(FONT_PATH, 72) except: logger.warning(f"Failed to load font from {FONT_PATH}, using default") font = ImageFont.load_default() # Join all words with spaces, but add markers around the current word text_parts = [] for word in context_words: if word == current_word: text_parts.append(f"[{word}]") # Mark current word else: text_parts.append(word) full_text = " ".join(text_parts) # Get text bounding box bbox = draw.textbbox((0, 0), full_text, font=font) text_width = bbox[2] - bbox[0] text_height = bbox[3] - bbox[1] # Calculate position to center the text x = (size[0] - text_width) // 2 y = (size[1] - text_height) // 2 # Draw each word with appropriate color current_x = x for word in context_words: word_with_space = word + " " word_bbox = draw.textbbox((0, 0), word_with_space, font=font) word_width = word_bbox[2] - word_bbox[0] # Highlight current word in yellow, others in white color = (255, 255, 0, 255) if word == current_word else (255, 255, 255, 255) # Full opacity for text draw.text((current_x, y), word_with_space, font=font, fill=color) current_x += word_width # Convert RGBA to RGB array with alpha channel img_array = np.array(image) return img_array subtitles = [] window_size = 4 # Number of words to show at once for i, current_word in enumerate(words): try: # Get context words (previous and next words) start_idx = max(0, i - window_size // 2) end_idx = min(len(words), i + window_size // 2 + 1) context_words = [w.text for w in words[start_idx:end_idx]] # Create image with text img_array = create_text_image(current_word.text, context_words) # Convert to ImageClip clip = ImageClip(img_array) # Set timing start_time = float(current_word.start) / 1000 duration = float(current_word.end - current_word.start) / 1000 # Position and time the clip clip = (clip .set_position(('center', 'center')) .set_start(start_time) .set_duration(duration)) subtitles.append(clip) logger.info(f"Successfully created clip for word: {current_word.text}") except Exception as clip_error: logger.error(f"Failed to create clip for word '{current_word.text}'. Error: {clip_error}") continue if not subtitles: logger.error("No valid subtitle clips were created") return None logger.info(f"Successfully generated {len(subtitles)} subtitle clips") return subtitles except Exception as e: logger.error(f"Error generating subtitles: {e}") logger.exception("Full traceback:") return None def is_post_url(url): """Check if the URL is a direct post URL.""" if not url: return False return bool(re.match(r'https?://(?:www\.)?reddit\.com/r/\w+/comments/\w+/?', url)) def generate_audio_from_reddit(url, filter_type, time_filter, selected_voice, rate, pitch): try: # Get content based on URL type try: if is_post_url(url): logger.info("Processing direct post URL...") post_content, comments = fetch_post_and_comments_from_url(url) else: logger.info("Processing subreddit URL...") post_content, comments = fetch_top_post_and_comments(url, filter_type, time_filter) except ValueError as e: logger.error(f"Error fetching content: {e}") return None except Exception as e: logger.error(f"Unexpected error fetching content: {e}") return None if not post_content or not comments: logger.error("Failed to get Reddit content") return None # Combine content combined_content = f"{post_content}. Hey, Listen:" for idx, comment in enumerate(comments, start=1): combined_content += f"{comment}. " # Generate audio audio_path = asyncio.run(text_to_speech(combined_content, voice=selected_voice, rate=rate, pitch=pitch)) if not audio_path: logger.error("Failed to generate audio") return None logger.info(f"Successfully generated audio at: {audio_path}") return audio_path except Exception as e: logger.error(f"Error in generate_audio_from_reddit: {e}") return None def search_pexels_video(query, page=1, per_page=5): if not PEXELS_API_KEY: logger.error("PEXELS_API_KEY is missing from environment variables") return None # Use only the first keyword (before comma) # query = keywords.split(",")[0].strip() # This was already handled, query is now a single keyword logger.info(f"Searching Pexels for: '{query}', page: {page}, per_page: {per_page}") headers = {"Authorization": PEXELS_API_KEY} params = {"query": query, "per_page": per_page, "page": page} response = requests.get("https://api.pexels.com/videos/search", headers=headers, params=params) logger.debug(f"Pexels API response status: {response.status_code} for query '{query}' page {page}") if response.status_code != 200: logger.error(f"Pexels API error: {response.text}") return None data = response.json() logger.info(f"Found {len(data.get('videos', []))} videos for query: {query}") videos_on_page = data.get("videos", []) if not videos_on_page: logger.warning(f"No videos found on page {page} for query: {query}") return None # To introduce more variety, we can pick a random video from the current page's results # instead of always the "best" or first one. selected_video_data = random.choice(videos_on_page) video_files = selected_video_data.get("video_files", []) if not video_files: logger.warning(f"Selected video (ID: {selected_video_data.get('id')}) has no video_files.") return None # Filter for minimum resolution (optional) and then pick highest available for that video filtered_files = [f for f in video_files if f["width"] >= 720] # Example: min width 720p files_to_consider = filtered_files or video_files # Fallback to all files if none meet filter if not files_to_consider: logger.warning(f"No suitable video files after filtering for video ID: {selected_video_data.get('id')}") return None # Pick the highest resolution available from the considered files sorted_files = sorted(files_to_consider, key=lambda f: f["width"] * f["height"], reverse=True) best_file = sorted_files[0] logger.info(f"Selected video file: {best_file['link']} ({best_file['width']}x{best_file['height']}) for query '{query}' page {page}") return best_file["link"] def _fetch_and_process_single_pexels_video(keyword_query, page_number, target_resolution=(1080, 1920)): """ Fetches a single video for a keyword, processes it (resize/crop), and returns the VideoFileClip object and its temporary path. The temporary file is NOT deleted by this function. """ url = search_pexels_video(keyword_query, page=page_number) if not url: logger.warning(f"No Pexels URL found for '{keyword_query}'.") return None, None # Indicate failure # Create a unique temporary file path temp_dir = "/tmp" os.makedirs(temp_dir, exist_ok=True) # Ensure /tmp exists temp_fd, temp_path = tempfile.mkstemp(suffix=".mp4", dir=temp_dir, prefix="pexels_") os.close(temp_fd) # Close the file descriptor, we just need the path video_clip_obj = None try: logger.info(f"Downloading Pexels clip from {url} to {temp_path}") with requests.get(url, stream=True, timeout=30) as r: # Added timeout r.raise_for_status() # Check for HTTP errors with open(temp_path, "wb") as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) logger.info(f"Pexels clip downloaded to {temp_path}") if not os.path.exists(temp_path) or os.path.getsize(temp_path) == 0: logger.error(f"Downloaded Pexels clip is missing or empty: {temp_path}") if os.path.exists(temp_path): os.remove(temp_path) # Clean up failed download return None, None video_clip_obj = VideoFileClip(temp_path) # If the fetched Pexels video is longer than 3 seconds, subclip it to 3 seconds. if video_clip_obj.duration > 3.0: logger.info(f"Original Pexels clip for '{keyword_query}' is {video_clip_obj.duration:.2f}s long. Subclipping to 3.0s.") active_segment = video_clip_obj.subclip(0, 3.0) else: active_segment = video_clip_obj # Use the full clip if it's <= 3s # Resize and crop clip_aspect_ratio = active_segment.w / active_segment.h target_aspect_ratio = target_resolution[0] / target_resolution[1] if clip_aspect_ratio > target_aspect_ratio: resized_segment = active_segment.resize(height=target_resolution[1]) else: resized_segment = active_segment.resize(width=target_resolution[0]) final_processed_segment = resized_segment.crop( x_center=resized_segment.w / 2, # Use .w and .h of the *resized* segment y_center=resized_segment.h / 2, width=target_resolution[0], height=target_resolution[1] ) # DO NOT CLOSE video_clip_obj here if processed_clip is derived from it and shares the reader. # If processed_clip is a new object with its own reader after crop/resize, then original can be closed. # MoviePy operations like resize/crop usually create new clip instances that might share the reader or copy frames. # It's safer to let the caller manage the lifecycle of the returned clip and its source file. logger.info(f"Pexels clip for '{keyword_query}' downloaded and processed. Path: {temp_path}, Final Segment Duration: {final_processed_segment.duration:.2f}s") return final_processed_segment, temp_path # Return the processed clip and its temp path except requests.exceptions.RequestException as e: logger.error(f"Failed to download Pexels video for '{keyword_query}': {e}") if os.path.exists(temp_path): os.remove(temp_path) return None, None except Exception as e: logger.error(f"Failed to process Pexels video: {e}") if video_clip_obj: video_clip_obj.close() # Close if an error occurred after opening if os.path.exists(temp_path): os.remove(temp_path) # Clean up on error return None, None def get_video_clip(background_type, duration, target_resolution=(1080, 1920), pexels_keywords=None): """Get a video clip with random start time and specified duration.""" logger.info(f"[DEBUG] background_type: '{background_type}', pexels_keywords: '{pexels_keywords}'") try: if background_type == "Pexels": if not pexels_keywords or not isinstance(pexels_keywords, list) or not any(kw.strip() for kw in pexels_keywords): logger.warning("No valid Pexels keywords provided or list is empty. Using solid green background.") return ColorClip(size=target_resolution, color=(0, 255, 0)).set_duration(duration), [] collected_clips = [] temp_files_to_delete_later = [] current_total_duration = 0.0 # --- New Pexels clip collection logic for grouping and duration fill --- # The pexels_keywords list is already stripped and filtered in tts_interface # before being passed to create_video_from_story or create_video_with_background. # So, pexels_keywords here should be a clean list of non-empty strings. # Cycle through keywords, fetching one clip per keyword per cycle, until duration is met. keyword_page_trackers = {kw: 1 for kw in pexels_keywords} # Track current page for each keyword MAX_FAILURES_PER_KEYWORD_TOTAL = 3 # Max total failures (no clip from any page) for a keyword before skipping it keyword_failure_counts = {kw: 0 for kw in pexels_keywords} active_keywords = list(pexels_keywords) # Keywords we are still trying to get clips from while current_total_duration < duration and any(active_keywords): keyword_processed_in_cycle = False for keyword_idx, keyword in enumerate(list(active_keywords)): # Iterate over a copy for safe removal if current_total_duration >= duration: break if keyword_failure_counts[keyword] >= MAX_FAILURES_PER_KEYWORD_TOTAL: if keyword in active_keywords: active_keywords.remove(keyword) # Stop trying this keyword continue current_page = keyword_page_trackers[keyword] logger.info(f"Trying keyword '{keyword}', page {current_page}. Total duration: {current_total_duration:.2f}s / {duration:.2f}s") clip_segment, temp_file_path = _fetch_and_process_single_pexels_video(keyword, current_page, target_resolution) if clip_segment and temp_file_path: collected_clips.append(clip_segment) temp_files_to_delete_later.append(temp_file_path) current_total_duration += clip_segment.duration keyword_page_trackers[keyword] += 1 # Move to next page for this keyword on its next turn keyword_failure_counts[keyword] = 0 # Reset failure count on success keyword_processed_in_cycle = True logger.info(f"Added clip for '{keyword}' (page {current_page}). Segment: {clip_segment.duration:.2f}s. Total: {current_total_duration:.2f}s") else: logger.warning(f"No clip found for '{keyword}' on page {current_page}.") keyword_failure_counts[keyword] += 1 keyword_page_trackers[keyword] += 1 # Still try next page next time if keyword_failure_counts[keyword] >= MAX_FAILURES_PER_KEYWORD_TOTAL: logger.info(f"Max total failures reached for keyword '{keyword}'. Removing from active list.") if keyword in active_keywords: active_keywords.remove(keyword) if not keyword_processed_in_cycle and any(active_keywords): # If a full cycle through active keywords yields nothing logger.info("A full cycle through active keywords yielded no new clips. Stopping Pexels search.") break # --- End of Pexels clip collection logic --- if not collected_clips: logger.warning("No Pexels clips were collected. Using solid green background.") return ColorClip(size=target_resolution, color=(0, 255, 0)).set_duration(duration), [] # Concatenate all collected clips logger.info(f"Concatenating {len(collected_clips)} Pexels clips.") final_pexels_video = concatenate_videoclips(collected_clips, method="compose") if final_pexels_video.duration == 0: # Should not happen if collected_clips is not empty logger.error("Concatenated Pexels video has zero duration. This should not happen if clips were collected. Using solid green.") for p_clip in collected_clips: p_clip.close() # Close individual segments for f_path in temp_files_to_delete_later: # Delete their temp files if os.path.exists(f_path): os.remove(f_path) return ColorClip(size=target_resolution, color=(0, 255, 0)).set_duration(duration), [] if final_pexels_video.duration > duration: logger.info(f"Subclipping concatenated Pexels video (duration {final_pexels_video.duration:.2f}s) to target duration ({duration:.2f}s)") final_pexels_video = final_pexels_video.subclip(0, duration) else: logger.info(f"Collected Pexels video duration {final_pexels_video.duration:.2f}s. Target audio duration is {duration:.2f}s. Video will be adjusted to audio duration during final composition if shorter.") return final_pexels_video, temp_files_to_delete_later # --- Handling for other background types (Minecraft, Cake Making, etc.) --- # Define video paths relative to the script location script_dir = os.path.dirname(os.path.abspath(__file__)) video_paths = { "Minecraft": os.path.join(script_dir, "Minecraft.mp4"), "Cake Making": os.path.join(script_dir, "A Collection OF CAKE Oddly Satisfying Chocolate Cake You Never Seen _ Awesome Cake Decorating Ideas.mp4"), "Satisfying ART": os.path.join(script_dir, "TOP 80 Satisfying Art Videos _ Best of The Year Quantastic.mp4"), } # Handle solid color backgrounds if background_type == "Black": logger.info("Creating solid black background") return ColorClip(size=target_resolution, color=(0, 0, 0)).set_duration(duration), [] elif background_type == "Green": logger.info("Creating solid green background") return ColorClip(size=target_resolution, color=(0, 255, 0)).set_duration(duration), [] # Handle predefined video backgrounds if background_type not in video_paths: logger.error(f"Invalid background type: {background_type}") logger.info(f"Available backgrounds: {list(video_paths.keys())}") return ColorClip(size=target_resolution, color=(0, 255, 0)).set_duration(duration), [] # Default to green video_path = video_paths[background_type] if not os.path.exists(video_path): logger.error(f"Background video not found: {video_path}") logger.info(f"Looking in directory: {script_dir}") logger.info(f"Available files: {os.listdir(script_dir)}") return ColorClip(size=target_resolution, color=(0, 255, 0)).set_duration(duration), [] logger.info(f"Loading background video: {video_path}") video = VideoFileClip(video_path) # This clip needs to be closed later max_start = max(0, video.duration - duration) start_time = random.uniform(0, max_start) if max_start > 0 else 0 logger.info(f"Selected start time: {start_time:.2f}s") clip = video.subclip(start_time, start_time + duration) target_aspect = target_resolution[0] / target_resolution[1] clip_aspect = clip.w / clip.h if clip_aspect > target_aspect: new_width = int(clip.h * target_aspect) x_center = clip.w // 2; x1 = x_center - (new_width // 2); x2 = x_center + (new_width // 2) clip = clip.crop(x1=x1, x2=x2) else: new_height = int(clip.w / target_aspect) y_center = clip.h // 2; y1 = y_center - (new_height // 2); y2 = y_center + (new_height // 2) clip = clip.crop(y1=y1, y2=y2) clip = clip.resize(target_resolution) if clip.duration < duration: clip = clip.loop(duration=duration) # For non-Pexels, the original 'video' object needs to be closed if 'clip' is derived. # However, 'clip' itself is what's returned. If 'clip' is a subclip, it shares the reader. # It's safer to close 'video' after 'clip' is fully processed by the caller. # For simplicity here, we assume 'clip' is self-contained enough or its reader is managed by MoviePy. # A more robust solution would track 'video' for later closure. # For now, returning just the clip and an empty list for temp files. # The 'video' object will be closed by Python's GC if not explicitly closed, but explicit is better. # This part needs careful thought on resource management if we were to return 'video' for closure. # Since we return 'clip', and 'video' is local, it should be fine. # The main issue was with Pexels temp files. logger.info("Background video clip created successfully") return clip, [] # Return clip and empty list for temp_files except Exception as e: logger.error(f"Error creating video background: {e}") logger.exception("Full traceback:") return ColorClip(size=target_resolution, color=(0, 255, 0)).set_duration(duration), [] def create_video_with_background(audio_path, subtitles, subreddit_url, selected_font="Mouldy Cheese", background="Green", output_path="/tmp/output_video.mp4", pexels_keywords=None): logger.info(f"[DEBUG] create_video_with_background received pexels_keywords: '{pexels_keywords}'") try: logger.info("Starting video creation process...") # Validate audio path if not isinstance(audio_path, str) or not os.path.exists(audio_path): logger.error(f"Invalid audio path: {audio_path}") return None, None # Validate subtitles if not isinstance(subtitles, list): logger.error(f"Invalid subtitles format: {type(subtitles)}") return None, None logger.info(f"Loading audio file: {audio_path}") try: # Create audio clip with error handling audio = AudioFileClip(audio_path) duration = audio.duration logger.info(f"Audio duration: {duration} seconds") except Exception as e: logger.error(f"Error loading audio file: {e}") return None, None try: # Create background clip logger.info(f"Creating background clip with {background} background...") # get_video_clip now returns a tuple (clip, temp_files_list) for Pexels result_bg = get_video_clip(background, duration, target_resolution=(1080,1920), pexels_keywords=pexels_keywords) if isinstance(result_bg, tuple): background_clip, pexels_temp_files_to_clean = result_bg else: # For non-Pexels backgrounds that return only the clip background_clip = result_bg # pexels_temp_files_to_clean remains empty if background_clip is None: logger.error("Failed to create background clip") return None, None logger.info("Background clip created successfully") except Exception as e: logger.error(f"Error creating background clip: {e}") return None, None try: # Combine background with subtitles logger.info(f"Combining {len(subtitles)} subtitle clips with background...") final_clips = [background_clip] + subtitles video = CompositeVideoClip(final_clips, size=(1080, 1920)) logger.info("Clips combined successfully") except Exception as e: logger.error(f"Error combining clips: {e}") return None, None try: # Set the audio logger.info("Setting audio to video...") video = video.set_audio(audio) logger.info("Audio set successfully") except Exception as e: logger.error(f"Error setting audio: {e}") return None, None try: # Write the result to a file in /tmp/ logger.info(f"Writing video to file: {output_path}") video.write_videofile( output_path, fps=30, codec='libx264', audio_codec='aac', temp_audiofile='temp-audio.m4a', remove_temp=True, logger=None ) logger.info("Video written successfully") except Exception as e: logger.error(f"Error writing video file: {e}") logger.exception("Full traceback:") return None, None finally: # Clean up try: video.close() audio.close() background_clip.close() # Explicitly close individual Pexels segments if background_clip is a CompositeVideoClip from Pexels if background == "Pexels" and hasattr(background_clip, 'clips') and background_clip.clips: # The 'clips' attribute of a CompositeVideoClip holds the list of original clips. # These are the ones that were in 'collected_clips' in get_video_clip. logger.info(f"Attempting to close {len(background_clip.clips)} Pexels sub-clips.") for pexels_segment_clip in background_clip.clips: if pexels_segment_clip: # Check if the clip object itself is not None pexels_segment_clip.close() for clip in subtitles: clip.close() except Exception as e: logger.warning(f"Error during cleanup: {e}") finally: # Ensure Pexels temp files are cleaned up for temp_f_path in pexels_temp_files_to_clean: if os.path.exists(temp_f_path): try: os.remove(temp_f_path) logger.info(f"Cleaned up Pexels temp file: {temp_f_path}") except Exception as e_del: logger.warning(f"Error deleting Pexels temp file {temp_f_path}: {e_del}") if os.path.exists(output_path): logger.info(f"Video successfully created at: {output_path}") # Upload to Hugging Face Dataset hf_api = HfApi() dataset_repo = "lolhaha002/redditbotdata" # Change this to your dataset name logger.info(f"Uploading video to Hugging Face Dataset: {output_path}") video_filename = os.path.basename(output_path) hf_api.upload_file( path_or_fileobj=output_path, path_in_repo=f"videos/{video_filename}", repo_id=dataset_repo, repo_type="dataset" ) # Generate public URL video_url = f"https://huggingface.co/datasets/{dataset_repo}/resolve/main/videos/{video_filename}" logger.info(f"Video uploaded successfully: {video_url}") return output_path, video_url else: logger.error("Video file not found after creation") return None, None except Exception as e: logger.error(f"Unexpected error in video creation: {e}") logger.exception("Full traceback:") # Cleanup Pexels temp files even on outer exception for temp_f_path in pexels_temp_files_to_clean: if os.path.exists(temp_f_path): try: os.remove(temp_f_path) logger.info(f"Cleaned up Pexels temp file on error: {temp_f_path}") except Exception as e_del: logger.warning(f"Error deleting Pexels temp file {temp_f_path} on error: {e_del}") return None, None def tts_interface(subreddit_url, story_text, filter_type, time_filter, selected_voice, rate, pitch, background, pexels_keywords): logger.info(f"[DEBUG] tts_interface received pexels_keywords: '{pexels_keywords}'") try: logger.info("Starting TTS interface process...") logger.info(f"Selected background: {background}") # Story Logic story_text = (story_text or "").strip() subreddit_url = (subreddit_url or "").strip() if not story_text and not subreddit_url: return None, None, "Please provide either a story or a Reddit URL." if story_text: # Parse pexels_keywords if provided for story mode if isinstance(pexels_keywords, str): keywords_list_story = [kw.strip() for kw in pexels_keywords.split(",") if kw.strip()] else: keywords_list_story = [] logger.info(f"[DEBUG] Parsed keywords list for story: {keywords_list_story}") return create_video_from_story(story_text, selected_voice, rate, pitch, background, pexels_keywords=keywords_list_story) # Generate audio logger.info("Generating audio from Reddit content...") audio_path = generate_audio_from_reddit(subreddit_url, filter_type, time_filter, selected_voice, rate, pitch) if not audio_path or not isinstance(audio_path, str): logger.error(f"Invalid audio path returned: {audio_path}") return None, None, "Failed to generate audio: Content not suitable or contains NSFW material" logger.info(f"Audio generated successfully: {audio_path}") # Generate subtitles logger.info(f"Generating subtitles") subtitles = generate_subtitles(audio_path) if subtitles is None: logger.error("Failed to generate subtitles") return None, None, "Failed to generate subtitles" logger.info(f"Generated {len(subtitles)} subtitle clips") # Create unique output path timestamp = int(time.time()) if subreddit_url and "reddit.com/r/" in subreddit_url: subreddit_name = subreddit_url.split("reddit.com/r/")[-1].split("/")[0] else: subreddit_name = "unknown" video_filename = f"{subreddit_name}_{timestamp}.mp4" output_path = f"/tmp/{video_filename}" logger.info(f"Creating video with output path: {output_path}") # Create video logger.info(f"[DEBUG] Raw pexels_keywords input: '{pexels_keywords}'") if isinstance(pexels_keywords, str): keywords_list = [kw.strip() for kw in pexels_keywords.split(",") if kw.strip()] else: keywords_list = [] logger.info(f"[DEBUG] Parsed keywords list: {keywords_list}") if not keywords_list: logger.warning("No valid Pexels keywords found after parsing.") else: for i, kw in enumerate(keywords_list, start=1): logger.info(f"[DEBUG] Keyword {i}: '{kw}'") video_path, video_url = create_video_with_background( audio_path=audio_path, subtitles=subtitles, subreddit_url=subreddit_url, selected_font="Mouldy Cheese", background=background, output_path=output_path, pexels_keywords=keywords_list ) if video_path is None: logger.error("Failed to create video") return None, None, "Failed to create video" logger.info(f"Video created at: {video_path}") logger.info(f"Video URL: {video_url}") # Clean up audio file only, keep the video file for preview try: if os.path.exists(audio_path): os.remove(audio_path) logger.info(f"Cleaned up temporary audio file: {audio_path}") except Exception as e: logger.warning(f"Failed to clean up temporary files: {e}") logger.info("Video generation process completed successfully") return video_path, video_url, "Video generated and uploaded successfully!" except Exception as e: logger.error(f"Error in TTS interface: {e}") logger.exception("Full traceback:") return None, None, f"Error: {str(e)}" if __name__ == "__main__": with gr.Blocks() as demo: gr.Markdown(""" # Reddit to Video Generator Enter either: - A subreddit URL (e.g., https://www.reddit.com/r/AskReddit/) to get top posts - A direct post URL (e.g., https://www.reddit.com/r/AskReddit/comments/abc123/post_title/) to use that specific post If you are looking for a bulk shorts creator and/or wants to provide a support, please checkout my fiverr gigs https://www.fiverr.com/s/dDdbGXZ """) with gr.Row(): with gr.Column(): url_input = gr.Textbox( label="Reddit URL", placeholder="Enter subreddit URL or direct post URL" ) story_input = gr.Textbox( label="Or Enter Your Own Story", placeholder="Paste or write your story here (no Reddit needed)", lines=6 ) word_count_display = gr.Textbox(label="Word Count", interactive=False) story_input.change( fn=count_words, inputs=story_input, outputs=word_count_display ) filter_type = gr.Dropdown( ["hot", "top", "new"], label="Filter Type (for subreddit URLs only)", value="hot" ) time_filter = gr.Dropdown( ["hour", "day", "week", "month", "year", "all"], label="Time Filter (for subreddit URLs only)", value="day" ) selected_voice = gr.Dropdown( choices=[f"{k} - {v}" for k, v in VOICE_OPTIONS.items()], value="en-US-GuyNeural - Male (American)", label="Voice" ) rate = gr.Slider( minimum=-100, maximum=100, value=0, step=10, label="Voice Speed" ) pitch = gr.Slider( minimum=-100, maximum=100, value=0, step=10, label="Voice Pitch" ) background = gr.Dropdown( list(BACKGROUND_OPTIONS.keys()), label="Background", value="Green" # Changed default to Green ) pexels_keywords = gr.Textbox( label="Pexels Keywords (for video background)", placeholder="e.g., nature, city night, forest" ) with gr.Column(): video_preview = gr.Video(label="Video Preview") dataset_url = gr.Textbox(label="Dataset URL (Click to View)") status_text = gr.Textbox(label="Status") submit_btn = gr.Button("Generate Video") # The pexels_keywords Textbox is already defined within the first gr.Column. # The print statement below and the second definition were redundant. submit_btn.click( fn=tts_interface, inputs=[url_input, story_input, filter_type, time_filter, selected_voice, rate, pitch, background, pexels_keywords], outputs=[video_preview, dataset_url, status_text], queue=True ) @gr.on(inputs=[pexels_keywords]) def debug_pexels_input(text): logger.debug(f"[DEBUGUI] Received Pexels keywords via UI event: '{text}'") return None # Explicitly return None or omit return demo.launch(share=True)