Spaces:
Runtime error
Runtime error
| import os | |
| import requests | |
| import asyncio | |
| import edge_tts | |
| import subprocess | |
| import re | |
| from telegram import Update, InputFile | |
| from telegram.ext import ApplicationBuilder, CommandHandler, MessageHandler, filters, ContextTypes | |
| # ========== SCRIPT GENERATION ========== | |
| def get_script(prompt, word_count): | |
| url = f"https://text.pollinations.ai/{prompt} ({word_count} words)" | |
| response = requests.get(url) | |
| return response.text.strip() | |
| # ========== TEXT SPLITTING ========== | |
| def split_sentences(text): | |
| return re.split(r'(?<=[.?!।])\s+', text.strip()) | |
| # ========== IMAGE GENERATION ========== | |
| def download_images(sentences): | |
| os.makedirs("images", exist_ok=True) | |
| image_paths = [] | |
| for idx, sentence in enumerate(sentences): | |
| url = f"https://image.pollinations.ai/prompt/ stunning 3d render styled images {sentence}?&nologo=True" | |
| img_path = f"images/img_{idx:03}.jpg" | |
| response = requests.get(url) | |
| with open(img_path, "wb") as f: | |
| f.write(response.content) | |
| image_paths.append(img_path) | |
| return image_paths | |
| # ========== AUDIO DURATION ========== | |
| def get_audio_duration(audio_path): | |
| result = subprocess.run( | |
| ["ffprobe", "-v", "error", "-show_entries", "format=duration", | |
| "-of", "default=noprint_wrappers=1:nokey=1", audio_path], | |
| stdout=subprocess.PIPE, stderr=subprocess.STDOUT) | |
| return float(result.stdout) | |
| # ========== TTS + SUBTITLES ========== | |
| async def generate_audio_and_sentence_subs(sentences, voice="hi-IN-SwaraNeural"): | |
| audio_path = "output.mp3" | |
| srt_path = "subtitles.srt" | |
| full_text = " ".join(sentences) | |
| communicate = edge_tts.Communicate(full_text, voice) | |
| with open(audio_path, "wb") as f: | |
| async for chunk in communicate.stream(): | |
| if chunk["type"] == "audio": | |
| f.write(chunk["data"]) | |
| duration = get_audio_duration(audio_path) | |
| per_sentence = duration / len(sentences) | |
| with open(srt_path, "w", encoding="utf-8") as f_srt: | |
| for i, sentence in enumerate(sentences): | |
| start = i * per_sentence | |
| end = (i + 1) * per_sentence | |
| def format_time(seconds): | |
| ms = int((seconds - int(seconds)) * 1000) | |
| h = int(seconds // 3600) | |
| m = int((seconds % 3600) // 60) | |
| s = int(seconds % 60) | |
| return f"{h:02}:{m:02}:{s:02},{ms:03}" | |
| f_srt.write(f"{i+1}\n") | |
| f_srt.write(f"{format_time(start)} --> {format_time(end)}\n") | |
| f_srt.write(f"{sentence.strip()}\n\n") | |
| return audio_path, srt_path, per_sentence | |
| # ========== VIDEO CREATION ========== | |
| def make_video(image_paths, audio_path, srt_path, duration_per_image, font_size, font_family, output="final_video.mp4"): | |
| with open("images.txt", "w") as f: | |
| for path in image_paths: | |
| f.write(f"file '{os.path.abspath(path)}'\n") | |
| f.write(f"duration {duration_per_image}\n") | |
| f.write(f"file '{os.path.abspath(image_paths[-1])}'\n") | |
| subprocess.call([ | |
| "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", "images.txt", | |
| "-vsync", "vfr", "-pix_fmt", "yuv420p", "-vf", "scale=720:1280", "temp_video.mp4" | |
| ]) | |
| subprocess.call([ | |
| "ffmpeg", "-y", "-i", "temp_video.mp4", "-i", audio_path, "-vf", | |
| f"subtitles={srt_path}:force_style='FontName={font_family},FontSize={font_size},BorderStyle=3,Outline=1,Shadow=0,Alignment=2'", | |
| "-c:a", "aac", "-b:a", "192k", output | |
| ]) | |
| return output | |
| # ========== TELEGRAM BOT ========== | |
| async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| try: | |
| text = update.message.text.strip() | |
| if ";" not in text: | |
| await update.message.reply_text("Send prompt in format: `prompt;word_count;font_size;font_name`", parse_mode="Markdown") | |
| return | |
| prompt, word_count, font_size, font_family = [x.strip() for x in text.split(";")] | |
| word_count = int(word_count) | |
| font_size = int(font_size) | |
| await update.message.reply_text("Generating script...") | |
| script = get_script(prompt, word_count) | |
| sentences = split_sentences(script) | |
| image_paths = download_images(sentences) | |
| await update.message.reply_text("Generating audio & subtitles...") | |
| audio_path, srt_path, duration_per_image = await generate_audio_and_sentence_subs(sentences) | |
| await update.message.reply_text("Creating video...") | |
| video_path = make_video(image_paths, audio_path, srt_path, duration_per_image, font_size, font_family) | |
| await update.message.reply_video(video=InputFile(video_path), caption="Here is your video") | |
| except Exception as e: | |
| await update.message.reply_text(f"Error: {e}") | |
| if __name__ == "__main__": | |
| import logging | |
| logging.basicConfig(level=logging.INFO) | |
| TOKEN = os.getenv("BOT_TOKEN") or "7497022424:AAG-TeECKpf9NCDZQWCIDnGHrKe2MEbk1I8" | |
| app = ApplicationBuilder().token(TOKEN).build() | |
| app.add_handler(MessageHandler(filters.TEXT & (~filters.COMMAND), handle_message)) | |
| app.run_polling() | |