| import sys |
| import os |
| import re |
| import time |
| import textwrap |
| import numpy as np |
| import requests |
| from PIL import Image, ImageDraw, ImageFont |
| from moviepy.editor import ImageClip, concatenate_videoclips, AudioFileClip |
| from io import BytesIO |
| import asyncio |
| import edge_tts |
| import upload |
|
|
| WIDTH, HEIGHT = 1080, 1920 |
| FPS = 30 |
| DURATION_PER_LINE = 4 |
| PEXELS_API_KEY = os.environ.get('PEXELS_API_KEY', '') |
|
|
| KEYWORDS = [ |
| "india city", "india technology", "india traffic", |
| "india government", "india future", "india electric car", |
| "india startup", "india innovation", "india growth", |
| "india 2026", "india street", "india economy" |
| ] |
|
|
| def clean_script(text): |
| text = re.sub(r'\*\*.*?\*\*', '', text) |
| text = re.sub(r'\(.*?\)', '', text) |
| text = re.sub(r'Host:|Expert:|Narrator:|Title:|Script:', '', text) |
| text = re.sub(r'\[.*?\]', '', text) |
| text = re.sub(r'#\w+', '', text) |
| return text.strip() |
|
|
| def get_pexels_image(keyword): |
| try: |
| headers = {'Authorization': PEXELS_API_KEY} |
| r = requests.get( |
| f'https://api.pexels.com/v1/search?query={keyword}&per_page=10&orientation=portrait', |
| headers=headers, timeout=15 |
| ) |
| data = r.json() |
| if data.get('photos'): |
| import random |
| photo = random.choice(data['photos']) |
| photo_url = photo['src']['portrait'] |
| img_response = requests.get(photo_url, timeout=15) |
| img = Image.open(BytesIO(img_response.content)).convert('RGB') |
| img = img.resize((WIDTH, HEIGHT)) |
| print(f"Pexels image fetched for '{keyword}' ✅") |
| return img |
| else: |
| print(f"No photos for: {keyword}") |
| return None |
| except Exception as e: |
| print(f"Pexels error: {e}") |
| return None |
|
|
| def make_frame(bg_image=None): |
| if bg_image: |
| img = bg_image.copy() |
| else: |
| img = Image.new('RGB', (WIDTH, HEIGHT), color=(15, 15, 25)) |
| return np.array(img) |
|
|
| def generate_tts(text, output_path): |
| try: |
| async def speak(): |
| communicate = edge_tts.Communicate(text, voice="en-IN-NeerjaNeural") |
| await communicate.save(output_path) |
| asyncio.run(speak()) |
| return True |
| except Exception as e: |
| print(f"TTS error: {e}") |
| return False |
|
|
| def generate_video(script, title, description): |
| print("Starting cinematic video generation...") |
| print(f"Script received: {script[:100]}") |
|
|
| cleaned = clean_script(script) |
|
|
| sentences = re.split(r'[.!?\n]', cleaned) |
| sentences = [s.strip() for s in sentences if len(s.strip()) > 10] |
|
|
| if not sentences: |
| words = cleaned.split() |
| sentences = [' '.join(words[i:i+8]) for i in range(0, len(words), 8)] |
|
|
| sentences = sentences[:10] |
| print(f"Total clips: {len(sentences)}") |
|
|
| clips = [] |
| for i, sentence in enumerate(sentences): |
| print(f"Creating clip {i+1}/{len(sentences)}: {sentence[:40]}") |
|
|
| |
| keyword = KEYWORDS[i % len(KEYWORDS)] |
| bg_image = get_pexels_image(keyword) |
|
|
| |
| audio_path = f'/app/audio_{i}.mp3' |
| has_audio = generate_tts(sentence, audio_path) |
|
|
| |
| frame = make_frame(bg_image) |
|
|
| if has_audio and os.path.exists(audio_path): |
| audio = AudioFileClip(audio_path) |
| duration = max(audio.duration + 0.5, DURATION_PER_LINE) |
| clip = ImageClip(frame, duration=duration) |
| clip = clip.set_audio(audio) |
| else: |
| clip = ImageClip(frame, duration=DURATION_PER_LINE) |
|
|
| clip = clip.fadein(0.5).fadeout(0.5) |
| clips.append(clip) |
|
|
| if not clips: |
| print("No clips generated!") |
| return False |
|
|
| print("Combining clips...") |
| final = concatenate_videoclips(clips, method="compose") |
| output_path = '/app/video.mp4' |
| print("Writing video...") |
| final.write_videofile( |
| output_path, |
| fps=FPS, |
| codec='libx264', |
| audio_codec='aac', |
| verbose=False, |
| logger=None |
| ) |
|
|
| for i in range(len(sentences)): |
| try: |
| os.remove(f'/app/audio_{i}.mp3') |
| except: |
| pass |
|
|
| print("Cinematic video generated successfully!") |
| return True |
|
|
| if __name__ == '__main__': |
| script = sys.argv[1] |
| title = sys.argv[2] |
| description = sys.argv[3] |
| success = generate_video(script, title, description) |
| if success: |
| print("Uploading to YouTube...") |
| video_id = upload.upload_video(title, description) |
| print(f"SUCCESS: https://youtube.com/shorts/{video_id}") |
| else: |
| print("Video generation failed!") |
|
|
|
|
|
|