import sys import os import re import time import textwrap import numpy as np import requests from PIL import Image, ImageDraw, ImageFont from moviepy.editor import ImageClip, concatenate_videoclips, AudioFileClip from io import BytesIO import asyncio import edge_tts import upload WIDTH, HEIGHT = 1080, 1920 FPS = 30 DURATION_PER_LINE = 4 PEXELS_API_KEY = os.environ.get('PEXELS_API_KEY', '') KEYWORDS = [ "india city", "india technology", "india traffic", "india government", "india future", "india electric car", "india startup", "india innovation", "india growth", "india 2026", "india street", "india economy" ] def clean_script(text): text = re.sub(r'\*\*.*?\*\*', '', text) text = re.sub(r'\(.*?\)', '', text) text = re.sub(r'Host:|Expert:|Narrator:|Title:|Script:', '', text) text = re.sub(r'\[.*?\]', '', text) text = re.sub(r'#\w+', '', text) return text.strip() def get_pexels_image(keyword): try: headers = {'Authorization': PEXELS_API_KEY} r = requests.get( f'https://api.pexels.com/v1/search?query={keyword}&per_page=10&orientation=portrait', headers=headers, timeout=15 ) data = r.json() if data.get('photos'): import random photo = random.choice(data['photos']) photo_url = photo['src']['portrait'] img_response = requests.get(photo_url, timeout=15) img = Image.open(BytesIO(img_response.content)).convert('RGB') img = img.resize((WIDTH, HEIGHT)) print(f"Pexels image fetched for '{keyword}' ✅") return img else: print(f"No photos for: {keyword}") return None except Exception as e: print(f"Pexels error: {e}") return None def make_frame(bg_image=None): if bg_image: img = bg_image.copy() else: img = Image.new('RGB', (WIDTH, HEIGHT), color=(15, 15, 25)) return np.array(img) def generate_tts(text, output_path): try: async def speak(): communicate = edge_tts.Communicate(text, voice="en-IN-NeerjaNeural") await communicate.save(output_path) asyncio.run(speak()) return True except Exception as e: print(f"TTS error: {e}") return False def generate_video(script, title, description): print("Starting cinematic video generation...") print(f"Script received: {script[:100]}") cleaned = clean_script(script) sentences = re.split(r'[.!?\n]', cleaned) sentences = [s.strip() for s in sentences if len(s.strip()) > 10] if not sentences: words = cleaned.split() sentences = [' '.join(words[i:i+8]) for i in range(0, len(words), 8)] sentences = sentences[:10] print(f"Total clips: {len(sentences)}") clips = [] for i, sentence in enumerate(sentences): print(f"Creating clip {i+1}/{len(sentences)}: {sentence[:40]}") # Different Pexels image for each clip keyword = KEYWORDS[i % len(KEYWORDS)] bg_image = get_pexels_image(keyword) # TTS audio audio_path = f'/app/audio_{i}.mp3' has_audio = generate_tts(sentence, audio_path) # Create frame - no text overlay frame = make_frame(bg_image) if has_audio and os.path.exists(audio_path): audio = AudioFileClip(audio_path) duration = max(audio.duration + 0.5, DURATION_PER_LINE) clip = ImageClip(frame, duration=duration) clip = clip.set_audio(audio) else: clip = ImageClip(frame, duration=DURATION_PER_LINE) clip = clip.fadein(0.5).fadeout(0.5) clips.append(clip) if not clips: print("No clips generated!") return False print("Combining clips...") final = concatenate_videoclips(clips, method="compose") output_path = '/app/video.mp4' print("Writing video...") final.write_videofile( output_path, fps=FPS, codec='libx264', audio_codec='aac', verbose=False, logger=None ) for i in range(len(sentences)): try: os.remove(f'/app/audio_{i}.mp3') except: pass print("Cinematic video generated successfully!") return True if __name__ == '__main__': script = sys.argv[1] title = sys.argv[2] description = sys.argv[3] success = generate_video(script, title, description) if success: print("Uploading to YouTube...") video_id = upload.upload_video(title, description) print(f"SUCCESS: https://youtube.com/shorts/{video_id}") else: print("Video generation failed!")