redditbot / app.py
lolhaha002's picture
Rename app2.py to app.py
40efc14 verified
import os
import praw
import time
import logging
import gradio as gr
from dotenv import load_dotenv
import re
from pydub import AudioSegment
import asyncio
import tempfile
import edge_tts
import random
import assemblyai as aai
from moviepy.config import change_settings
from moviepy.editor import *
from moviepy.editor import TextClip, CompositeVideoClip, AudioFileClip, ColorClip
from PIL import Image, ImageDraw, ImageFont
import numpy as np
from huggingface_hub import HfApi, login
import requests
# Initialize logger
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
os.system("apt-get update && apt-get install -y fonts-dejavu")
FONT_PATH = "MouldyCheeseRegular-WyMWG.ttf"
# Verify font file existence
if not os.path.exists(FONT_PATH):
raise FileNotFoundError(f"Font file not found: {FONT_PATH}")
print(f"Using font at: {FONT_PATH}")
# Logger setup
logger = logging.getLogger("reddit_audio")
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
change_settings({"IMAGEMAGICK_BINARY": None}) # Disable ImageMagick
# Load environment variables (Hugging Face Secrets)
load_dotenv()
REDDIT_CLIENT_ID = os.getenv("REDDIT_CLIENT_ID")
REDDIT_CLIENT_SECRET = os.getenv("REDDIT_CLIENT_SECRET")
REDDIT_USER_AGENT = os.getenv("REDDIT_USER_AGENT")
PEXELS_API_KEY = os.getenv("PEXELS_API_KEY")
# Initialize Reddit client
try:
reddit = praw.Reddit(
client_id=REDDIT_CLIENT_ID,
client_secret=REDDIT_CLIENT_SECRET,
user_agent=REDDIT_USER_AGENT,
)
# Test the connection
reddit.user.me()
print(reddit.user.me())
except Exception as e:
logger.error(f"Failed to initialize Reddit client: {e}")
logger.error("Please check your Reddit API credentials in the .env file")
logger.error("Required environment variables: REDDIT_CLIENT_ID, REDDIT_CLIENT_SECRET, REDDIT_USER_AGENT")
# Voice options with descriptions
VOICE_OPTIONS = {
# American English Voices
"en-US-GuyNeural": "Male (American)",
"en-US-JennyNeural": "Female (American)",
"en-US-AriaNeural": "Female (American)",
# British English Voices
"en-GB-RyanNeural": "Male (British)",
"en-GB-SoniaNeural": "Female (British)",
"en-GB-LibbyNeural": "Female (British)",
"en-GB-AlfieNeural": "Male (British)",
"en-GB-ElliotNeural": "Male (British)",
# Australian English Voices
"en-AU-NatashaNeural": "Female (Australian)",
"en-AU-WilliamNeural": "Male (Australian)",
"en-AU-AnnetteNeural": "Female (Australian)",
"en-AU-CarlyNeural": "Female (Australian)",
"en-AU-DuncanNeural": "Male (Australian)",
# Indian English Voices
"en-IN-NeerjaNeural": "Female (Indian)",
"en-IN-PrabhatNeural": "Male (Indian)",
# Irish English Voice
"en-IE-ConnorNeural": "Male (Irish)",
"en-IE-EmilyNeural": "Female (Irish)",
# Canadian English Voices
"en-CA-ClaraNeural": "Female (Canadian)",
"en-CA-LiamNeural": "Male (Canadian)"
}
# Background video options
BACKGROUND_OPTIONS = {
"Green": "Solid green background",
"Black": "Solid black background",
"Minecraft": "Minecraft gameplay",
"Cake Making": "Oddly Satisfying Cake Making",
"Satisfying ART": "Satisfying Art background",
"Pexels": "Use stock videos based on keywords from Pexels"
}
# Directory to save audio files
os.makedirs("audio_outputs", exist_ok=True)
HF_TOKEN = os.getenv("HF_TOKEN") # Fetch the token from Hugging Face Secrets
login(HF_TOKEN)
# Initialize Hugging Face API
hf_api = HfApi()
def clean_text(text):
"""Remove emojis and unsupported characters from text."""
text = re.sub(r'[\U00010000-\U0010FFFF]+', '', text) # Remove emojis
text = re.sub(r'[^\w\s.,!?\'"-]', '', text) # Remove unsupported characters
return text
def load_nsfw_words(file_path="nsfw_words.txt"):
"""Load NSFW words from a file."""
if not os.path.exists(file_path):
logger.warning(f"NSFW words file not found: {file_path}")
return []
with open(file_path, "r") as f:
return [line.strip().lower() for line in f if line.strip()]
# Load NSFW words dynamically
NSFW_WORDS = load_nsfw_words()
def filter_nsfw_words(text):
"""Replace NSFW words with [beep]"""
if not text:
return text
text_lower = text.lower()
result = text
for word in NSFW_WORDS:
if word in text_lower:
# Find the actual word with original case
start = text_lower.find(word)
while start != -1:
end = start + len(word)
# Replace only if it's a whole word
if (start == 0 or not text_lower[start-1].isalnum()) and \
(end == len(text_lower) or not text_lower[end].isalnum()):
result = result[:start] + "[beep]" + result[end:]
text_lower = text_lower[:start] + "[beep]" + text_lower[end:]
start = text_lower.find(word, start + 1)
return result
def contains_nsfw_words(text):
"""Check if text contains NSFW words."""
text_lower = text.lower()
for word in NSFW_WORDS:
if word in text_lower:
return True
return False
# Word Count logic for story input
def count_words(text):
text = text or ""
word_count = len(text.strip().split())
return f"{word_count} words"
def fetch_top_post_and_comments(subreddit_url, filter_type="hot", time_filter="day", max_duration=45, min_duration=30, max_retries=10):
"""Fetch the top post and comments from a subreddit URL with strict duration enforcement."""
try:
# Extract subreddit name
subreddit_name = subreddit_url.rstrip("/").split("/")[-1]
subreddit = reddit.subreddit(subreddit_name)
logger.info(f"Fetching posts from subreddit: {subreddit_name}")
# Fetch posts based on filter type
posts = list(subreddit.hot(limit=20) if filter_type == "hot" else subreddit.top(limit=20, time_filter=time_filter))
retries = 0
while retries < max_retries:
retries += 1
logger.info(f"Attempt {retries}/{max_retries} to find suitable post")
# Filter only NSFW-marked posts
suitable_posts = [post for post in posts if not post.over_18]
if not suitable_posts:
raise ValueError("No suitable posts found.")
# Randomly select a post
top_post = random.choice(suitable_posts)
posts.remove(top_post) # Remove from pool
# Clean and filter title and selftext
title = clean_text(top_post.title)
title = filter_nsfw_words(title)
selftext = clean_text(top_post.selftext)
selftext = filter_nsfw_words(selftext)
post_content = f"{title}"
if selftext:
post_content += f". {selftext}"
total_duration = estimate_audio_duration(post_content)
logger.info(f"Post content duration: {total_duration}s")
if total_duration > max_duration:
logger.debug(f"Post content too long ({total_duration}s), trying another")
continue
# Collect comments while staying within duration limits
top_comments = []
top_post.comments.replace_more(limit=0)
for comment in top_post.comments:
if isinstance(comment, praw.models.Comment):
comment_text = clean_text(comment.body)
# Skip deleted comments
if comment_text.lower() in ["deleted", "[deleted]"]:
continue
# Filter NSFW words
comment_text = filter_nsfw_words(comment_text)
comment_duration = estimate_audio_duration(comment_text)
logger.debug(f"Comment duration: {comment_duration}s")
if total_duration + comment_duration > max_duration:
break
top_comments.append(comment_text)
total_duration += comment_duration
# Verify final duration
if min_duration <= total_duration <= max_duration and top_comments:
logger.info(f"Found suitable content with {len(top_comments)} comments")
return post_content, top_comments
logger.warning(f"Content duration ({total_duration}s) outside acceptable range or no valid comments")
raise ValueError("Unable to find suitable post within retry limit.")
except praw.exceptions.PRAWException as e:
logger.error(f"Reddit API error: {e}")
raise ValueError(f"Failed to fetch Reddit content: {e}")
except Exception as e:
logger.error(f"Unexpected error fetching content: {e}")
raise ValueError(f"Failed to fetch Reddit content: {e}")
def fetch_post_and_comments_from_url(post_url, max_duration=45, min_duration=30):
"""Fetch a specific Reddit post and its comments using the post URL."""
reddit = praw.Reddit(
client_id=REDDIT_CLIENT_ID,
client_secret=REDDIT_CLIENT_SECRET,
user_agent=REDDIT_USER_AGENT,
)
# Fetch the submission (post) using the URL
submission = reddit.submission(url=post_url)
logger.info(f"Fetching post from URL: {post_url}")
if submission.over_18:
logger.warning("Post is marked as NSFW")
raise ValueError("The post is marked as NSFW and cannot be processed.")
# Clean and filter title and selftext
title = clean_text(submission.title)
title = filter_nsfw_words(title)
selftext = clean_text(submission.selftext)
selftext = filter_nsfw_words(selftext)
post_content = f"{title}"
if selftext:
post_content += f". {selftext}"
total_duration = estimate_audio_duration(post_content)
logger.info(f"Post content duration: {total_duration}s")
# If post content alone exceeds max duration, truncate it
if total_duration > max_duration:
logger.warning(f"Post content too long ({total_duration}s > {max_duration}s)")
raise ValueError(f"Post content alone exceeds maximum duration ({total_duration}s > {max_duration}s)")
# Fetch comments while respecting duration constraints
top_comments = []
submission.comments.replace_more(limit=0)
# Keep collecting comments until we hit minimum duration or run out of comments
for comment in submission.comments:
if isinstance(comment, praw.models.Comment):
comment_text = clean_text(comment.body)
# Skip deleted comments
if comment_text.lower() in ["deleted", "[deleted]"]:
continue
# Filter NSFW words
comment_text = filter_nsfw_words(comment_text)
comment_duration = estimate_audio_duration(comment_text)
logger.debug(f"Potential comment duration: {comment_duration}s")
# Check if adding this comment would exceed max duration
if total_duration + comment_duration > max_duration:
if total_duration >= min_duration:
logger.info(f"Reached sufficient duration ({total_duration}s), skipping remaining comments")
break
else:
logger.debug(f"Comment would exceed max duration ({total_duration + comment_duration}s > {max_duration}s), but haven't reached min duration yet. Looking for shorter comments...")
continue
top_comments.append(comment_text)
total_duration += comment_duration
logger.debug(f"Added comment. New total duration: {total_duration}s")
# If we've reached minimum duration, we can stop
if total_duration >= min_duration:
logger.info(f"Reached minimum duration ({total_duration}s)")
break
# Now check if we have enough content
if total_duration < min_duration:
logger.warning(f"Content too short ({total_duration}s < {min_duration}s)")
raise ValueError(f"Content is too short ({total_duration}s) to generate audio in the desired duration range ({min_duration}s-{max_duration}s).")
if not top_comments:
logger.warning("No valid comments found")
raise ValueError("No valid comments found in the post.")
logger.info(f"Successfully fetched post with {len(top_comments)} comments. Total duration: {total_duration}s")
return post_content, top_comments
def create_video_from_story(story_text, selected_voice, rate, pitch, background, pexels_keywords=None):
cleaned_story = clean_text(story_text)
cleaned_story = filter_nsfw_words(cleaned_story)
audio_path = asyncio.run(text_to_speech(cleaned_story, voice=selected_voice, rate=rate, pitch=pitch))
if not audio_path:
return None, None, "Failed to generate audio from story"
subtitles = generate_subtitles(audio_path)
if subtitles is None:
return None, None, "Failed to generate subtitles"
timestamp = int(time.time())
output_path = f"/tmp/story_{timestamp}.mp4"
video_path, video_url = create_video_with_background(
audio_path=audio_path,
subtitles=subtitles,
subreddit_url="story",
selected_font="Mouldy Cheese",
background=background,
output_path=output_path,
pexels_keywords=pexels_keywords
)
if not video_path:
return None, None, "Failed to create video"
try:
if os.path.exists(audio_path):
os.remove(audio_path)
except Exception as e:
logger.warning(f"Failed to clean up audio: {e}")
return video_path, video_url, "Video generated successfully from story!"
def estimate_audio_duration(text, words_per_second=3.5, pause_per_sentence=1.0, pause_per_comment=1.5):
"""Estimate the duration of the audio based on text length and pauses."""
word_count = len(text.split())
sentence_count = text.count('.') + text.count('!') + text.count('?')
comment_count = text.count('. Comments: ') # Count comment transitions
duration = (word_count / words_per_second) + (sentence_count * pause_per_sentence) + (comment_count * pause_per_comment)
return duration
async def text_to_speech(text, voice="en-US-GuyNeural - Male (American)", rate=0, pitch=0):
"""Convert text to speech using edge-tts."""
try:
# Extract voice ID from the display string (e.g., "en-US-GuyNeural - Male (American)" -> "en-US-GuyNeural")
voice_id = voice.split(" - ")[0] if " - " in voice else voice
# Format rate and pitch as required by edge-tts
rate_str = f"{rate:+d}%"
pitch_str = f"{pitch:+d}Hz"
# Create output directory if it doesn't exist
os.makedirs("temp", exist_ok=True)
# Generate unique filename
timestamp = int(time.time())
output_file = f"temp/speech_{timestamp}.mp3"
logger.info(f"Generating TTS with voice: {voice_id}, rate: {rate_str}, pitch: {pitch_str}")
# Configure voice settings
communicate = edge_tts.Communicate(text, voice_id, rate=rate_str, volume='+0%', pitch=pitch_str)
# Convert to audio
await communicate.save(output_file)
if os.path.exists(output_file):
logger.info(f"Successfully generated audio file: {output_file}")
return output_file
else:
logger.error("Failed to generate audio file")
return None
except Exception as e:
logger.error(f"Error in text_to_speech: {e}")
return None
def generate_subtitles(audio_path):
try:
if not audio_path or not os.path.exists(audio_path):
logger.error(f"Invalid audio path: {audio_path}")
return None
# Initialize AssemblyAI
aai.settings.api_key = os.getenv("ASSEMBLYAI_API_KEY")
if not aai.settings.api_key:
logger.error("AssemblyAI API key not found")
return None
transcriber = aai.Transcriber()
logger.info(f"Uploading audio file: {audio_path}")
with open(audio_path, "rb") as audio_file:
transcript = transcriber.transcribe(audio_file)
transcript.wait_for_completion()
words = transcript.words
if not words:
logger.error("No words found in transcript")
return None
logger.info(f"Received {len(words)} words from transcription")
def create_text_image(current_word, context_words, size=(1000, 150)):
# Create a new image with an RGBA mode for transparency
image = Image.new('RGBA', size, (0, 0, 0, 0)) # Fully transparent background
draw = ImageDraw.Draw(image)
try:
font = ImageFont.truetype(FONT_PATH, 72)
except:
logger.warning(f"Failed to load font from {FONT_PATH}, using default")
font = ImageFont.load_default()
# Join all words with spaces, but add markers around the current word
text_parts = []
for word in context_words:
if word == current_word:
text_parts.append(f"[{word}]") # Mark current word
else:
text_parts.append(word)
full_text = " ".join(text_parts)
# Get text bounding box
bbox = draw.textbbox((0, 0), full_text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
# Calculate position to center the text
x = (size[0] - text_width) // 2
y = (size[1] - text_height) // 2
# Draw each word with appropriate color
current_x = x
for word in context_words:
word_with_space = word + " "
word_bbox = draw.textbbox((0, 0), word_with_space, font=font)
word_width = word_bbox[2] - word_bbox[0]
# Highlight current word in yellow, others in white
color = (255, 255, 0, 255) if word == current_word else (255, 255, 255, 255) # Full opacity for text
draw.text((current_x, y), word_with_space, font=font, fill=color)
current_x += word_width
# Convert RGBA to RGB array with alpha channel
img_array = np.array(image)
return img_array
subtitles = []
window_size = 4 # Number of words to show at once
for i, current_word in enumerate(words):
try:
# Get context words (previous and next words)
start_idx = max(0, i - window_size // 2)
end_idx = min(len(words), i + window_size // 2 + 1)
context_words = [w.text for w in words[start_idx:end_idx]]
# Create image with text
img_array = create_text_image(current_word.text, context_words)
# Convert to ImageClip
clip = ImageClip(img_array)
# Set timing
start_time = float(current_word.start) / 1000
duration = float(current_word.end - current_word.start) / 1000
# Position and time the clip
clip = (clip
.set_position(('center', 'center'))
.set_start(start_time)
.set_duration(duration))
subtitles.append(clip)
logger.info(f"Successfully created clip for word: {current_word.text}")
except Exception as clip_error:
logger.error(f"Failed to create clip for word '{current_word.text}'. Error: {clip_error}")
continue
if not subtitles:
logger.error("No valid subtitle clips were created")
return None
logger.info(f"Successfully generated {len(subtitles)} subtitle clips")
return subtitles
except Exception as e:
logger.error(f"Error generating subtitles: {e}")
logger.exception("Full traceback:")
return None
def is_post_url(url):
"""Check if the URL is a direct post URL."""
if not url:
return False
return bool(re.match(r'https?://(?:www\.)?reddit\.com/r/\w+/comments/\w+/?', url))
def generate_audio_from_reddit(url, filter_type, time_filter, selected_voice, rate, pitch):
try:
# Get content based on URL type
try:
if is_post_url(url):
logger.info("Processing direct post URL...")
post_content, comments = fetch_post_and_comments_from_url(url)
else:
logger.info("Processing subreddit URL...")
post_content, comments = fetch_top_post_and_comments(url, filter_type, time_filter)
except ValueError as e:
logger.error(f"Error fetching content: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error fetching content: {e}")
return None
if not post_content or not comments:
logger.error("Failed to get Reddit content")
return None
# Combine content
combined_content = f"{post_content}. Hey, Listen:"
for idx, comment in enumerate(comments, start=1):
combined_content += f"{comment}. "
# Generate audio
audio_path = asyncio.run(text_to_speech(combined_content, voice=selected_voice, rate=rate, pitch=pitch))
if not audio_path:
logger.error("Failed to generate audio")
return None
logger.info(f"Successfully generated audio at: {audio_path}")
return audio_path
except Exception as e:
logger.error(f"Error in generate_audio_from_reddit: {e}")
return None
def search_pexels_video(query, page=1, per_page=5):
if not PEXELS_API_KEY:
logger.error("PEXELS_API_KEY is missing from environment variables")
return None
# Use only the first keyword (before comma)
# query = keywords.split(",")[0].strip() # This was already handled, query is now a single keyword
logger.info(f"Searching Pexels for: '{query}', page: {page}, per_page: {per_page}")
headers = {"Authorization": PEXELS_API_KEY}
params = {"query": query, "per_page": per_page, "page": page}
response = requests.get("https://api.pexels.com/videos/search", headers=headers, params=params)
logger.debug(f"Pexels API response status: {response.status_code} for query '{query}' page {page}")
if response.status_code != 200:
logger.error(f"Pexels API error: {response.text}")
return None
data = response.json()
logger.info(f"Found {len(data.get('videos', []))} videos for query: {query}")
videos_on_page = data.get("videos", [])
if not videos_on_page:
logger.warning(f"No videos found on page {page} for query: {query}")
return None
# To introduce more variety, we can pick a random video from the current page's results
# instead of always the "best" or first one.
selected_video_data = random.choice(videos_on_page)
video_files = selected_video_data.get("video_files", [])
if not video_files:
logger.warning(f"Selected video (ID: {selected_video_data.get('id')}) has no video_files.")
return None
# Filter for minimum resolution (optional) and then pick highest available for that video
filtered_files = [f for f in video_files if f["width"] >= 720] # Example: min width 720p
files_to_consider = filtered_files or video_files # Fallback to all files if none meet filter
if not files_to_consider:
logger.warning(f"No suitable video files after filtering for video ID: {selected_video_data.get('id')}")
return None
# Pick the highest resolution available from the considered files
sorted_files = sorted(files_to_consider, key=lambda f: f["width"] * f["height"], reverse=True)
best_file = sorted_files[0]
logger.info(f"Selected video file: {best_file['link']} ({best_file['width']}x{best_file['height']}) for query '{query}' page {page}")
return best_file["link"]
def _fetch_and_process_single_pexels_video(keyword_query, page_number, target_resolution=(1080, 1920)):
"""
Fetches a single video for a keyword, processes it (resize/crop),
and returns the VideoFileClip object and its temporary path.
The temporary file is NOT deleted by this function.
"""
url = search_pexels_video(keyword_query, page=page_number)
if not url:
logger.warning(f"No Pexels URL found for '{keyword_query}'.")
return None, None # Indicate failure
# Create a unique temporary file path
temp_dir = "/tmp"
os.makedirs(temp_dir, exist_ok=True) # Ensure /tmp exists
temp_fd, temp_path = tempfile.mkstemp(suffix=".mp4", dir=temp_dir, prefix="pexels_")
os.close(temp_fd) # Close the file descriptor, we just need the path
video_clip_obj = None
try:
logger.info(f"Downloading Pexels clip from {url} to {temp_path}")
with requests.get(url, stream=True, timeout=30) as r: # Added timeout
r.raise_for_status() # Check for HTTP errors
with open(temp_path, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
logger.info(f"Pexels clip downloaded to {temp_path}")
if not os.path.exists(temp_path) or os.path.getsize(temp_path) == 0:
logger.error(f"Downloaded Pexels clip is missing or empty: {temp_path}")
if os.path.exists(temp_path): os.remove(temp_path) # Clean up failed download
return None, None
video_clip_obj = VideoFileClip(temp_path)
# If the fetched Pexels video is longer than 3 seconds, subclip it to 3 seconds.
if video_clip_obj.duration > 3.0:
logger.info(f"Original Pexels clip for '{keyword_query}' is {video_clip_obj.duration:.2f}s long. Subclipping to 3.0s.")
active_segment = video_clip_obj.subclip(0, 3.0)
else:
active_segment = video_clip_obj # Use the full clip if it's <= 3s
# Resize and crop
clip_aspect_ratio = active_segment.w / active_segment.h
target_aspect_ratio = target_resolution[0] / target_resolution[1]
if clip_aspect_ratio > target_aspect_ratio:
resized_segment = active_segment.resize(height=target_resolution[1])
else:
resized_segment = active_segment.resize(width=target_resolution[0])
final_processed_segment = resized_segment.crop(
x_center=resized_segment.w / 2, # Use .w and .h of the *resized* segment
y_center=resized_segment.h / 2,
width=target_resolution[0],
height=target_resolution[1]
)
# DO NOT CLOSE video_clip_obj here if processed_clip is derived from it and shares the reader.
# If processed_clip is a new object with its own reader after crop/resize, then original can be closed.
# MoviePy operations like resize/crop usually create new clip instances that might share the reader or copy frames.
# It's safer to let the caller manage the lifecycle of the returned clip and its source file.
logger.info(f"Pexels clip for '{keyword_query}' downloaded and processed. Path: {temp_path}, Final Segment Duration: {final_processed_segment.duration:.2f}s")
return final_processed_segment, temp_path # Return the processed clip and its temp path
except requests.exceptions.RequestException as e:
logger.error(f"Failed to download Pexels video for '{keyword_query}': {e}")
if os.path.exists(temp_path): os.remove(temp_path)
return None, None
except Exception as e:
logger.error(f"Failed to process Pexels video: {e}")
if video_clip_obj: video_clip_obj.close() # Close if an error occurred after opening
if os.path.exists(temp_path): os.remove(temp_path) # Clean up on error
return None, None
def get_video_clip(background_type, duration, target_resolution=(1080, 1920), pexels_keywords=None):
"""Get a video clip with random start time and specified duration."""
logger.info(f"[DEBUG] background_type: '{background_type}', pexels_keywords: '{pexels_keywords}'")
try:
if background_type == "Pexels":
if not pexels_keywords or not isinstance(pexels_keywords, list) or not any(kw.strip() for kw in pexels_keywords):
logger.warning("No valid Pexels keywords provided or list is empty. Using solid green background.")
return ColorClip(size=target_resolution, color=(0, 255, 0)).set_duration(duration), []
collected_clips = []
temp_files_to_delete_later = []
current_total_duration = 0.0
# --- New Pexels clip collection logic for grouping and duration fill ---
# The pexels_keywords list is already stripped and filtered in tts_interface
# before being passed to create_video_from_story or create_video_with_background.
# So, pexels_keywords here should be a clean list of non-empty strings.
# Cycle through keywords, fetching one clip per keyword per cycle, until duration is met.
keyword_page_trackers = {kw: 1 for kw in pexels_keywords} # Track current page for each keyword
MAX_FAILURES_PER_KEYWORD_TOTAL = 3 # Max total failures (no clip from any page) for a keyword before skipping it
keyword_failure_counts = {kw: 0 for kw in pexels_keywords}
active_keywords = list(pexels_keywords) # Keywords we are still trying to get clips from
while current_total_duration < duration and any(active_keywords):
keyword_processed_in_cycle = False
for keyword_idx, keyword in enumerate(list(active_keywords)): # Iterate over a copy for safe removal
if current_total_duration >= duration:
break
if keyword_failure_counts[keyword] >= MAX_FAILURES_PER_KEYWORD_TOTAL:
if keyword in active_keywords: active_keywords.remove(keyword) # Stop trying this keyword
continue
current_page = keyword_page_trackers[keyword]
logger.info(f"Trying keyword '{keyword}', page {current_page}. Total duration: {current_total_duration:.2f}s / {duration:.2f}s")
clip_segment, temp_file_path = _fetch_and_process_single_pexels_video(keyword, current_page, target_resolution)
if clip_segment and temp_file_path:
collected_clips.append(clip_segment)
temp_files_to_delete_later.append(temp_file_path)
current_total_duration += clip_segment.duration
keyword_page_trackers[keyword] += 1 # Move to next page for this keyword on its next turn
keyword_failure_counts[keyword] = 0 # Reset failure count on success
keyword_processed_in_cycle = True
logger.info(f"Added clip for '{keyword}' (page {current_page}). Segment: {clip_segment.duration:.2f}s. Total: {current_total_duration:.2f}s")
else:
logger.warning(f"No clip found for '{keyword}' on page {current_page}.")
keyword_failure_counts[keyword] += 1
keyword_page_trackers[keyword] += 1 # Still try next page next time
if keyword_failure_counts[keyword] >= MAX_FAILURES_PER_KEYWORD_TOTAL:
logger.info(f"Max total failures reached for keyword '{keyword}'. Removing from active list.")
if keyword in active_keywords: active_keywords.remove(keyword)
if not keyword_processed_in_cycle and any(active_keywords): # If a full cycle through active keywords yields nothing
logger.info("A full cycle through active keywords yielded no new clips. Stopping Pexels search.")
break
# --- End of Pexels clip collection logic ---
if not collected_clips:
logger.warning("No Pexels clips were collected. Using solid green background.")
return ColorClip(size=target_resolution, color=(0, 255, 0)).set_duration(duration), []
# Concatenate all collected clips
logger.info(f"Concatenating {len(collected_clips)} Pexels clips.")
final_pexels_video = concatenate_videoclips(collected_clips, method="compose")
if final_pexels_video.duration == 0: # Should not happen if collected_clips is not empty
logger.error("Concatenated Pexels video has zero duration. This should not happen if clips were collected. Using solid green.")
for p_clip in collected_clips: p_clip.close() # Close individual segments
for f_path in temp_files_to_delete_later: # Delete their temp files
if os.path.exists(f_path): os.remove(f_path)
return ColorClip(size=target_resolution, color=(0, 255, 0)).set_duration(duration), []
if final_pexels_video.duration > duration:
logger.info(f"Subclipping concatenated Pexels video (duration {final_pexels_video.duration:.2f}s) to target duration ({duration:.2f}s)")
final_pexels_video = final_pexels_video.subclip(0, duration)
else:
logger.info(f"Collected Pexels video duration {final_pexels_video.duration:.2f}s. Target audio duration is {duration:.2f}s. Video will be adjusted to audio duration during final composition if shorter.")
return final_pexels_video, temp_files_to_delete_later
# --- Handling for other background types (Minecraft, Cake Making, etc.) ---
# Define video paths relative to the script location
script_dir = os.path.dirname(os.path.abspath(__file__))
video_paths = {
"Minecraft": os.path.join(script_dir, "Minecraft.mp4"),
"Cake Making": os.path.join(script_dir, "A Collection OF CAKE Oddly Satisfying Chocolate Cake You Never Seen _ Awesome Cake Decorating Ideas.mp4"),
"Satisfying ART": os.path.join(script_dir, "TOP 80 Satisfying Art Videos _ Best of The Year Quantastic.mp4"),
}
# Handle solid color backgrounds
if background_type == "Black":
logger.info("Creating solid black background")
return ColorClip(size=target_resolution, color=(0, 0, 0)).set_duration(duration), []
elif background_type == "Green":
logger.info("Creating solid green background")
return ColorClip(size=target_resolution, color=(0, 255, 0)).set_duration(duration), []
# Handle predefined video backgrounds
if background_type not in video_paths:
logger.error(f"Invalid background type: {background_type}")
logger.info(f"Available backgrounds: {list(video_paths.keys())}")
return ColorClip(size=target_resolution, color=(0, 255, 0)).set_duration(duration), [] # Default to green
video_path = video_paths[background_type]
if not os.path.exists(video_path):
logger.error(f"Background video not found: {video_path}")
logger.info(f"Looking in directory: {script_dir}")
logger.info(f"Available files: {os.listdir(script_dir)}")
return ColorClip(size=target_resolution, color=(0, 255, 0)).set_duration(duration), []
logger.info(f"Loading background video: {video_path}")
video = VideoFileClip(video_path) # This clip needs to be closed later
max_start = max(0, video.duration - duration)
start_time = random.uniform(0, max_start) if max_start > 0 else 0
logger.info(f"Selected start time: {start_time:.2f}s")
clip = video.subclip(start_time, start_time + duration)
target_aspect = target_resolution[0] / target_resolution[1]
clip_aspect = clip.w / clip.h
if clip_aspect > target_aspect:
new_width = int(clip.h * target_aspect)
x_center = clip.w // 2; x1 = x_center - (new_width // 2); x2 = x_center + (new_width // 2)
clip = clip.crop(x1=x1, x2=x2)
else:
new_height = int(clip.w / target_aspect)
y_center = clip.h // 2; y1 = y_center - (new_height // 2); y2 = y_center + (new_height // 2)
clip = clip.crop(y1=y1, y2=y2)
clip = clip.resize(target_resolution)
if clip.duration < duration:
clip = clip.loop(duration=duration)
# For non-Pexels, the original 'video' object needs to be closed if 'clip' is derived.
# However, 'clip' itself is what's returned. If 'clip' is a subclip, it shares the reader.
# It's safer to close 'video' after 'clip' is fully processed by the caller.
# For simplicity here, we assume 'clip' is self-contained enough or its reader is managed by MoviePy.
# A more robust solution would track 'video' for later closure.
# For now, returning just the clip and an empty list for temp files.
# The 'video' object will be closed by Python's GC if not explicitly closed, but explicit is better.
# This part needs careful thought on resource management if we were to return 'video' for closure.
# Since we return 'clip', and 'video' is local, it should be fine.
# The main issue was with Pexels temp files.
logger.info("Background video clip created successfully")
return clip, [] # Return clip and empty list for temp_files
except Exception as e:
logger.error(f"Error creating video background: {e}")
logger.exception("Full traceback:")
return ColorClip(size=target_resolution, color=(0, 255, 0)).set_duration(duration), []
def create_video_with_background(audio_path, subtitles, subreddit_url, selected_font="Mouldy Cheese", background="Green", output_path="/tmp/output_video.mp4", pexels_keywords=None):
logger.info(f"[DEBUG] create_video_with_background received pexels_keywords: '{pexels_keywords}'")
try:
logger.info("Starting video creation process...")
# Validate audio path
if not isinstance(audio_path, str) or not os.path.exists(audio_path):
logger.error(f"Invalid audio path: {audio_path}")
return None, None
# Validate subtitles
if not isinstance(subtitles, list):
logger.error(f"Invalid subtitles format: {type(subtitles)}")
return None, None
logger.info(f"Loading audio file: {audio_path}")
try:
# Create audio clip with error handling
audio = AudioFileClip(audio_path)
duration = audio.duration
logger.info(f"Audio duration: {duration} seconds")
except Exception as e:
logger.error(f"Error loading audio file: {e}")
return None, None
try:
# Create background clip
logger.info(f"Creating background clip with {background} background...")
# get_video_clip now returns a tuple (clip, temp_files_list) for Pexels
result_bg = get_video_clip(background, duration, target_resolution=(1080,1920), pexels_keywords=pexels_keywords)
if isinstance(result_bg, tuple):
background_clip, pexels_temp_files_to_clean = result_bg
else: # For non-Pexels backgrounds that return only the clip
background_clip = result_bg
# pexels_temp_files_to_clean remains empty
if background_clip is None:
logger.error("Failed to create background clip")
return None, None
logger.info("Background clip created successfully")
except Exception as e:
logger.error(f"Error creating background clip: {e}")
return None, None
try:
# Combine background with subtitles
logger.info(f"Combining {len(subtitles)} subtitle clips with background...")
final_clips = [background_clip] + subtitles
video = CompositeVideoClip(final_clips, size=(1080, 1920))
logger.info("Clips combined successfully")
except Exception as e:
logger.error(f"Error combining clips: {e}")
return None, None
try:
# Set the audio
logger.info("Setting audio to video...")
video = video.set_audio(audio)
logger.info("Audio set successfully")
except Exception as e:
logger.error(f"Error setting audio: {e}")
return None, None
try:
# Write the result to a file in /tmp/
logger.info(f"Writing video to file: {output_path}")
video.write_videofile(
output_path,
fps=30,
codec='libx264',
audio_codec='aac',
temp_audiofile='temp-audio.m4a',
remove_temp=True,
logger=None
)
logger.info("Video written successfully")
except Exception as e:
logger.error(f"Error writing video file: {e}")
logger.exception("Full traceback:")
return None, None
finally:
# Clean up
try:
video.close()
audio.close()
background_clip.close()
# Explicitly close individual Pexels segments if background_clip is a CompositeVideoClip from Pexels
if background == "Pexels" and hasattr(background_clip, 'clips') and background_clip.clips:
# The 'clips' attribute of a CompositeVideoClip holds the list of original clips.
# These are the ones that were in 'collected_clips' in get_video_clip.
logger.info(f"Attempting to close {len(background_clip.clips)} Pexels sub-clips.")
for pexels_segment_clip in background_clip.clips:
if pexels_segment_clip: # Check if the clip object itself is not None
pexels_segment_clip.close()
for clip in subtitles:
clip.close()
except Exception as e:
logger.warning(f"Error during cleanup: {e}")
finally: # Ensure Pexels temp files are cleaned up
for temp_f_path in pexels_temp_files_to_clean:
if os.path.exists(temp_f_path):
try:
os.remove(temp_f_path)
logger.info(f"Cleaned up Pexels temp file: {temp_f_path}")
except Exception as e_del:
logger.warning(f"Error deleting Pexels temp file {temp_f_path}: {e_del}")
if os.path.exists(output_path):
logger.info(f"Video successfully created at: {output_path}")
# Upload to Hugging Face Dataset
hf_api = HfApi()
dataset_repo = "lolhaha002/redditbotdata" # Change this to your dataset name
logger.info(f"Uploading video to Hugging Face Dataset: {output_path}")
video_filename = os.path.basename(output_path)
hf_api.upload_file(
path_or_fileobj=output_path,
path_in_repo=f"videos/{video_filename}",
repo_id=dataset_repo,
repo_type="dataset"
)
# Generate public URL
video_url = f"https://huggingface.co/datasets/{dataset_repo}/resolve/main/videos/{video_filename}"
logger.info(f"Video uploaded successfully: {video_url}")
return output_path, video_url
else:
logger.error("Video file not found after creation")
return None, None
except Exception as e:
logger.error(f"Unexpected error in video creation: {e}")
logger.exception("Full traceback:")
# Cleanup Pexels temp files even on outer exception
for temp_f_path in pexels_temp_files_to_clean:
if os.path.exists(temp_f_path):
try:
os.remove(temp_f_path)
logger.info(f"Cleaned up Pexels temp file on error: {temp_f_path}")
except Exception as e_del:
logger.warning(f"Error deleting Pexels temp file {temp_f_path} on error: {e_del}")
return None, None
def tts_interface(subreddit_url, story_text, filter_type, time_filter, selected_voice, rate, pitch, background, pexels_keywords):
logger.info(f"[DEBUG] tts_interface received pexels_keywords: '{pexels_keywords}'")
try:
logger.info("Starting TTS interface process...")
logger.info(f"Selected background: {background}")
# Story Logic
story_text = (story_text or "").strip()
subreddit_url = (subreddit_url or "").strip()
if not story_text and not subreddit_url:
return None, None, "Please provide either a story or a Reddit URL."
if story_text:
# Parse pexels_keywords if provided for story mode
if isinstance(pexels_keywords, str):
keywords_list_story = [kw.strip() for kw in pexels_keywords.split(",") if kw.strip()]
else:
keywords_list_story = []
logger.info(f"[DEBUG] Parsed keywords list for story: {keywords_list_story}")
return create_video_from_story(story_text, selected_voice, rate, pitch, background, pexels_keywords=keywords_list_story)
# Generate audio
logger.info("Generating audio from Reddit content...")
audio_path = generate_audio_from_reddit(subreddit_url, filter_type, time_filter, selected_voice, rate, pitch)
if not audio_path or not isinstance(audio_path, str):
logger.error(f"Invalid audio path returned: {audio_path}")
return None, None, "Failed to generate audio: Content not suitable or contains NSFW material"
logger.info(f"Audio generated successfully: {audio_path}")
# Generate subtitles
logger.info(f"Generating subtitles")
subtitles = generate_subtitles(audio_path)
if subtitles is None:
logger.error("Failed to generate subtitles")
return None, None, "Failed to generate subtitles"
logger.info(f"Generated {len(subtitles)} subtitle clips")
# Create unique output path
timestamp = int(time.time())
if subreddit_url and "reddit.com/r/" in subreddit_url:
subreddit_name = subreddit_url.split("reddit.com/r/")[-1].split("/")[0]
else:
subreddit_name = "unknown"
video_filename = f"{subreddit_name}_{timestamp}.mp4"
output_path = f"/tmp/{video_filename}"
logger.info(f"Creating video with output path: {output_path}")
# Create video
logger.info(f"[DEBUG] Raw pexels_keywords input: '{pexels_keywords}'")
if isinstance(pexels_keywords, str):
keywords_list = [kw.strip() for kw in pexels_keywords.split(",") if kw.strip()]
else:
keywords_list = []
logger.info(f"[DEBUG] Parsed keywords list: {keywords_list}")
if not keywords_list:
logger.warning("No valid Pexels keywords found after parsing.")
else:
for i, kw in enumerate(keywords_list, start=1):
logger.info(f"[DEBUG] Keyword {i}: '{kw}'")
video_path, video_url = create_video_with_background(
audio_path=audio_path,
subtitles=subtitles,
subreddit_url=subreddit_url,
selected_font="Mouldy Cheese",
background=background,
output_path=output_path,
pexels_keywords=keywords_list
)
if video_path is None:
logger.error("Failed to create video")
return None, None, "Failed to create video"
logger.info(f"Video created at: {video_path}")
logger.info(f"Video URL: {video_url}")
# Clean up audio file only, keep the video file for preview
try:
if os.path.exists(audio_path):
os.remove(audio_path)
logger.info(f"Cleaned up temporary audio file: {audio_path}")
except Exception as e:
logger.warning(f"Failed to clean up temporary files: {e}")
logger.info("Video generation process completed successfully")
return video_path, video_url, "Video generated and uploaded successfully!"
except Exception as e:
logger.error(f"Error in TTS interface: {e}")
logger.exception("Full traceback:")
return None, None, f"Error: {str(e)}"
if __name__ == "__main__":
with gr.Blocks() as demo:
gr.Markdown("""
# Reddit to Video Generator
Enter either:
- A subreddit URL (e.g., https://www.reddit.com/r/AskReddit/) to get top posts
- A direct post URL (e.g., https://www.reddit.com/r/AskReddit/comments/abc123/post_title/) to use that specific post
If you are looking for a bulk shorts creator and/or wants to provide a support, please checkout my fiverr gigs https://www.fiverr.com/s/dDdbGXZ
""")
with gr.Row():
with gr.Column():
url_input = gr.Textbox(
label="Reddit URL",
placeholder="Enter subreddit URL or direct post URL"
)
story_input = gr.Textbox(
label="Or Enter Your Own Story",
placeholder="Paste or write your story here (no Reddit needed)",
lines=6
)
word_count_display = gr.Textbox(label="Word Count", interactive=False)
story_input.change(
fn=count_words,
inputs=story_input,
outputs=word_count_display
)
filter_type = gr.Dropdown(
["hot", "top", "new"],
label="Filter Type (for subreddit URLs only)",
value="hot"
)
time_filter = gr.Dropdown(
["hour", "day", "week", "month", "year", "all"],
label="Time Filter (for subreddit URLs only)",
value="day"
)
selected_voice = gr.Dropdown(
choices=[f"{k} - {v}" for k, v in VOICE_OPTIONS.items()],
value="en-US-GuyNeural - Male (American)",
label="Voice"
)
rate = gr.Slider(
minimum=-100,
maximum=100,
value=0,
step=10,
label="Voice Speed"
)
pitch = gr.Slider(
minimum=-100,
maximum=100,
value=0,
step=10,
label="Voice Pitch"
)
background = gr.Dropdown(
list(BACKGROUND_OPTIONS.keys()),
label="Background",
value="Green" # Changed default to Green
)
pexels_keywords = gr.Textbox(
label="Pexels Keywords (for video background)",
placeholder="e.g., nature, city night, forest"
)
with gr.Column():
video_preview = gr.Video(label="Video Preview")
dataset_url = gr.Textbox(label="Dataset URL (Click to View)")
status_text = gr.Textbox(label="Status")
submit_btn = gr.Button("Generate Video")
# The pexels_keywords Textbox is already defined within the first gr.Column.
# The print statement below and the second definition were redundant.
submit_btn.click(
fn=tts_interface,
inputs=[url_input, story_input, filter_type, time_filter, selected_voice, rate, pitch, background, pexels_keywords],
outputs=[video_preview, dataset_url, status_text],
queue=True
)
@gr.on(inputs=[pexels_keywords])
def debug_pexels_input(text):
logger.debug(f"[DEBUGUI] Received Pexels keywords via UI event: '{text}'")
return None # Explicitly return None or omit return
demo.launch(share=True)