Spaces:
Paused
Paused
| import os | |
| import re | |
| import asyncio | |
| import threading | |
| import logging | |
| from pathlib import Path | |
| from datetime import datetime | |
| import yt_dlp | |
| from flask import Flask | |
| from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup | |
| from telegram.ext import ( | |
| Application, | |
| CommandHandler, | |
| MessageHandler, | |
| CallbackQueryHandler, | |
| ContextTypes, | |
| filters, | |
| ) | |
| from telegram.request import HTTPXRequest | |
| logging.basicConfig( | |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
| level=logging.INFO | |
| ) | |
| logger = logging.getLogger(__name__) | |
| BOT_TOKEN = os.environ.get("BOT_TOKEN") | |
| TELEGRAM_API_BASE = os.environ.get("TELEGRAM_API_BASE", "https://api.telegram.org") | |
| PROXY_URL = os.environ.get("PROXY_URL", "socks5://127.0.0.1:1080") | |
| DOWNLOAD_DIR = Path("/app/downloads") | |
| DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True) | |
| MAX_FILE_SIZE = 50 * 1024 * 1024 | |
| YOUTUBE_REGEX = re.compile( | |
| r'(https?://)?(www\.)?(youtube|youtu|youtube-nocookie)\.(com|be)/' | |
| r'(watch\?v=|embed/|v/|.+\?v=|shorts/)?([^&=%\?/]{11})' | |
| ) | |
| flask_app = Flask(__name__) | |
| def health_check(): | |
| return {"status": "ok", "message": "YouTube Downloader Bot is running"} | |
| def health(): | |
| return {"status": "healthy", "timestamp": datetime.now().isoformat()} | |
| VIDEO_QUALITIES = [ | |
| ("360p", "best[height<=360]"), | |
| ("480p", "best[height<=480]"), | |
| ("720p", "best[height<=720]"), | |
| ("1080p", "best[height<=1080]"), | |
| ] | |
| def get_main_menu_keyboard(): | |
| keyboard = [ | |
| [InlineKeyboardButton("πΊ Cara Menggunakan", callback_data="help")], | |
| [InlineKeyboardButton("βΉοΈ Tentang Bot", callback_data="about")], | |
| ] | |
| return InlineKeyboardMarkup(keyboard) | |
| def get_format_keyboard(video_id: str): | |
| keyboard = [ | |
| [ | |
| InlineKeyboardButton("π¬ Video", callback_data=f"format:video:{video_id}"), | |
| InlineKeyboardButton("π΅ Audio (MP3)", callback_data=f"format:audio:{video_id}"), | |
| ], | |
| [InlineKeyboardButton("β Batal", callback_data="cancel")], | |
| ] | |
| return InlineKeyboardMarkup(keyboard) | |
| def get_quality_keyboard(video_id: str): | |
| keyboard = [] | |
| row = [] | |
| for i, (label, _) in enumerate(VIDEO_QUALITIES): | |
| row.append(InlineKeyboardButton(label, callback_data=f"quality:{label}:{video_id}")) | |
| if len(row) == 2: | |
| keyboard.append(row) | |
| row = [] | |
| if row: | |
| keyboard.append(row) | |
| keyboard.append([InlineKeyboardButton("β Batal", callback_data="cancel")]) | |
| return InlineKeyboardMarkup(keyboard) | |
| def extract_video_id(url: str) -> str | None: | |
| match = YOUTUBE_REGEX.search(url) | |
| if match: | |
| return match.group(6) | |
| return None | |
| class ProgressTracker: | |
| """Track download progress and update Telegram message.""" | |
| def __init__(self, context, chat_id, message_id, format_type): | |
| self.context = context | |
| self.chat_id = chat_id | |
| self.message_id = message_id | |
| self.format_type = format_type | |
| self.last_update = 0 | |
| self.last_percent = -1 | |
| def progress_hook(self, d): | |
| """Called by yt-dlp during download.""" | |
| if d['status'] == 'downloading': | |
| try: | |
| percent_str = d.get('_percent_str', '0%').strip('%') | |
| percent = float(percent_str) | |
| # Update only if significant change (every 10%) | |
| if percent - self.last_percent >= 10: | |
| self.last_percent = percent | |
| # Create update task | |
| asyncio.create_task(self.update_message(percent)) | |
| except: | |
| pass | |
| async def update_message(self, percent): | |
| """Update telegram message with progress.""" | |
| try: | |
| bars = int(percent / 10) | |
| progress_bar = "β" * bars + "β" * (10 - bars) | |
| text = ( | |
| f"β³ Mengunduh {self.format_type}...\n\n" | |
| f"[{progress_bar}] {percent:.0f}%" | |
| ) | |
| await self.context.bot.edit_message_text( | |
| chat_id=self.chat_id, | |
| message_id=self.message_id, | |
| text=text | |
| ) | |
| except Exception as e: | |
| logger.debug(f"Progress update error: {e}") | |
| def get_ydl_opts(proxy: str = None, progress_hook=None) -> dict: | |
| opts = { | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| 'extract_flat': False, | |
| 'socket_timeout': 30, | |
| } | |
| if proxy: | |
| opts['proxy'] = proxy | |
| if progress_hook: | |
| opts['progress_hooks'] = [progress_hook] | |
| return opts | |
| def get_video_info(url: str) -> dict | None: | |
| ydl_opts = get_ydl_opts(PROXY_URL) | |
| try: | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=False) | |
| return { | |
| 'title': info.get('title', 'Unknown'), | |
| 'duration': info.get('duration', 0), | |
| 'thumbnail': info.get('thumbnail', ''), | |
| 'uploader': info.get('uploader', 'Unknown'), | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting video info: {e}") | |
| return None | |
| def download_video_with_progress(url: str, quality: str, video_id: str, progress_hook=None) -> Path | None: | |
| output_path = DOWNLOAD_DIR / f"{video_id}.mp4" | |
| format_str = "best" | |
| for label, fmt in VIDEO_QUALITIES: | |
| if label == quality: | |
| format_str = fmt | |
| break | |
| ydl_opts = get_ydl_opts(PROXY_URL, progress_hook) | |
| ydl_opts.update({ | |
| 'format': f'{format_str}[ext=mp4]/best[ext=mp4]/best', | |
| 'outtmpl': str(output_path), | |
| 'merge_output_format': 'mp4', | |
| 'concurrent_fragment_downloads': 5, | |
| 'retries': 10, | |
| 'fragment_retries': 10, | |
| }) | |
| try: | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([url]) | |
| if output_path.exists(): | |
| return output_path | |
| except Exception as e: | |
| logger.error(f"Error downloading video: {e}") | |
| return None | |
| def download_audio_with_progress(url: str, video_id: str, progress_hook=None) -> Path | None: | |
| output_path = DOWNLOAD_DIR / f"{video_id}.mp3" | |
| ydl_opts = get_ydl_opts(PROXY_URL, progress_hook) | |
| ydl_opts.update({ | |
| 'format': 'bestaudio/best', | |
| 'outtmpl': str(DOWNLOAD_DIR / f"{video_id}.%(ext)s"), | |
| 'postprocessors': [{ | |
| 'key': 'FFmpegExtractAudio', | |
| 'preferredcodec': 'mp3', | |
| 'preferredquality': '192', | |
| }], | |
| 'retries': 10, | |
| 'fragment_retries': 10, | |
| }) | |
| try: | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([url]) | |
| if output_path.exists(): | |
| return output_path | |
| except Exception as e: | |
| logger.error(f"Error downloading audio: {e}") | |
| return None | |
| def cleanup_file(file_path: Path): | |
| try: | |
| if file_path.exists(): | |
| file_path.unlink() | |
| except Exception as e: | |
| logger.error(f"Error cleaning up file: {e}") | |
| async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| welcome_text = ( | |
| "π¬ *YouTube Downloader Bot*\n\n" | |
| "Selamat datang! Bot ini membantu Anda download video YouTube.\n\n" | |
| "π *Cara Menggunakan:*\n" | |
| "1. Kirim link YouTube\n" | |
| "2. Pilih format (Video/Audio)\n" | |
| "3. Pilih kualitas video\n" | |
| "4. Tunggu proses download\n\n" | |
| f"π Proxy: `{PROXY_URL}`\n" | |
| "β οΈ *Catatan:* Maksimal ukuran file 50MB" | |
| ) | |
| await update.message.reply_text( | |
| welcome_text, | |
| parse_mode="Markdown", | |
| reply_markup=get_main_menu_keyboard() | |
| ) | |
| async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| help_text = ( | |
| "π *Panduan Penggunaan*\n\n" | |
| "1οΈβ£ Kirim link YouTube ke bot\n" | |
| "2οΈβ£ Pilih format download:\n" | |
| " β’ π¬ Video - download dengan video\n" | |
| " β’ π΅ Audio - download MP3 saja\n" | |
| "3οΈβ£ Pilih kualitas (untuk video)\n" | |
| "4οΈβ£ Tunggu proses selesai\n\n" | |
| "π *Tips:*\n" | |
| "β’ Untuk video pendek, pilih 720p atau 1080p\n" | |
| "β’ Untuk video panjang, pilih 360p atau 480p\n" | |
| "β’ Audio MP3 cocok untuk musik" | |
| ) | |
| await update.message.reply_text(help_text, parse_mode="Markdown") | |
| async def handle_youtube_url(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| text = update.message.text | |
| video_id = extract_video_id(text) | |
| if not video_id: | |
| await update.message.reply_text( | |
| "β Link tidak valid. Silakan kirim link YouTube yang benar." | |
| ) | |
| return | |
| context.user_data['current_url'] = text | |
| context.user_data['current_video_id'] = video_id | |
| status_msg = await update.message.reply_text("β³ Mengambil informasi video...") | |
| loop = asyncio.get_event_loop() | |
| info = await loop.run_in_executor(None, get_video_info, text) | |
| if not info: | |
| await status_msg.edit_text( | |
| "β Gagal mengambil informasi video.\n\n" | |
| "Kemungkinan penyebab:\n" | |
| "β’ Video tidak tersedia\n" | |
| "β’ Proxy tidak berfungsi\n" | |
| "β’ Video dilindungi regional" | |
| ) | |
| return | |
| context.user_data['video_info'] = info | |
| duration = info['duration'] | |
| mins, secs = divmod(duration, 60) | |
| duration_str = f"{mins}:{secs:02d}" | |
| info_text = ( | |
| f"πΊ *{info['title']}*\n\n" | |
| f"π€ Channel: {info['uploader']}\n" | |
| f"β± Durasi: {duration_str}\n\n" | |
| "Pilih format download:" | |
| ) | |
| await status_msg.edit_text( | |
| info_text, | |
| parse_mode="Markdown", | |
| reply_markup=get_format_keyboard(video_id) | |
| ) | |
| async def handle_callback(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| query = update.callback_query | |
| await query.answer() | |
| data = query.data | |
| if data == "cancel": | |
| await query.edit_message_text("β Download dibatalkan.") | |
| return | |
| if data == "help": | |
| help_text = ( | |
| "π *Panduan Penggunaan*\n\n" | |
| "1οΈβ£ Kirim link YouTube ke bot\n" | |
| "2οΈβ£ Pilih format download\n" | |
| "3οΈβ£ Pilih kualitas (untuk video)\n" | |
| "4οΈβ£ Tunggu proses selesai" | |
| ) | |
| await query.edit_message_text(help_text, parse_mode="Markdown") | |
| return | |
| if data == "about": | |
| about_text = ( | |
| "βΉοΈ *Tentang Bot*\n\n" | |
| "Bot ini dibuat untuk memudahkan download video YouTube.\n\n" | |
| "π§ Teknologi:\n" | |
| "β’ Python + python-telegram-bot\n" | |
| "β’ yt-dlp untuk download\n" | |
| "β’ Xray untuk proxy\n" | |
| "β’ Hugging Face Spaces\n\n" | |
| "β οΈ Gunakan dengan bijak!" | |
| ) | |
| await query.edit_message_text(about_text, parse_mode="Markdown") | |
| return | |
| if data.startswith("format:"): | |
| parts = data.split(":") | |
| format_type = parts[1] | |
| video_id = parts[2] | |
| url = context.user_data.get('current_url') | |
| if not url: | |
| await query.edit_message_text("β Sesi expired. Silakan kirim link lagi.") | |
| return | |
| if format_type == "audio": | |
| await query.edit_message_text("β³ Memproses download audio...\n\n[ββββββββββ] 0%") | |
| # Setup progress tracker | |
| progress = ProgressTracker( | |
| context, | |
| query.message.chat_id, | |
| query.message.message_id, | |
| "audio" | |
| ) | |
| loop = asyncio.get_event_loop() | |
| file_path = await loop.run_in_executor( | |
| None, | |
| download_audio_with_progress, | |
| url, | |
| video_id, | |
| progress.progress_hook | |
| ) | |
| if not file_path: | |
| await query.edit_message_text("β Gagal download audio. Coba lagi.") | |
| return | |
| file_size = file_path.stat().st_size | |
| if file_size > MAX_FILE_SIZE: | |
| cleanup_file(file_path) | |
| await query.edit_message_text( | |
| "β File terlalu besar (>50MB). Tidak bisa dikirim via Telegram." | |
| ) | |
| return | |
| await query.edit_message_text("π€ Mengirim audio...") | |
| info = context.user_data.get('video_info', {}) | |
| try: | |
| with open(file_path, 'rb') as audio_file: | |
| await context.bot.send_audio( | |
| chat_id=query.message.chat_id, | |
| audio=audio_file, | |
| title=info.get('title', 'Audio'), | |
| performer=info.get('uploader', 'Unknown'), | |
| ) | |
| await query.edit_message_text("β Audio berhasil dikirim!") | |
| except Exception as e: | |
| logger.error(f"Error sending audio: {e}") | |
| await query.edit_message_text("β Gagal mengirim audio.") | |
| finally: | |
| cleanup_file(file_path) | |
| else: | |
| await query.edit_message_text( | |
| "π¬ Pilih kualitas video:", | |
| reply_markup=get_quality_keyboard(video_id) | |
| ) | |
| elif data.startswith("quality:"): | |
| parts = data.split(":") | |
| quality = parts[1] | |
| video_id = parts[2] | |
| url = context.user_data.get('current_url') | |
| if not url: | |
| await query.edit_message_text("β Sesi expired. Silakan kirim link lagi.") | |
| return | |
| await query.edit_message_text(f"β³ Mengunduh video ({quality})...\n\n[ββββββββββ] 0%") | |
| # Setup progress tracker | |
| progress = ProgressTracker( | |
| context, | |
| query.message.chat_id, | |
| query.message.message_id, | |
| f"video {quality}" | |
| ) | |
| loop = asyncio.get_event_loop() | |
| file_path = await loop.run_in_executor( | |
| None, | |
| download_video_with_progress, | |
| url, | |
| quality, | |
| video_id, | |
| progress.progress_hook | |
| ) | |
| if not file_path: | |
| await query.edit_message_text("β Gagal download video. Coba lagi.") | |
| return | |
| file_size = file_path.stat().st_size | |
| if file_size > MAX_FILE_SIZE: | |
| cleanup_file(file_path) | |
| await query.edit_message_text( | |
| f"β File terlalu besar ({file_size // (1024*1024)}MB > 50MB).\n" | |
| "Coba pilih kualitas yang lebih rendah." | |
| ) | |
| return | |
| await query.edit_message_text("π€ Mengirim video...") | |
| info = context.user_data.get('video_info', {}) | |
| try: | |
| with open(file_path, 'rb') as video_file: | |
| await context.bot.send_video( | |
| chat_id=query.message.chat_id, | |
| video=video_file, | |
| caption=f"πΊ {info.get('title', 'Video')}", | |
| supports_streaming=True, | |
| ) | |
| await query.edit_message_text("β Video berhasil dikirim!") | |
| except Exception as e: | |
| logger.error(f"Error sending video: {e}") | |
| await query.edit_message_text("β Gagal mengirim video.") | |
| finally: | |
| cleanup_file(file_path) | |
| async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| text = update.message.text | |
| if YOUTUBE_REGEX.search(text): | |
| await handle_youtube_url(update, context) | |
| else: | |
| await update.message.reply_text( | |
| "π Kirim link YouTube untuk memulai download.\n\n" | |
| "Contoh: https://www.youtube.com/watch?v=xxxxx" | |
| ) | |
| def run_flask(): | |
| flask_app.run(host="0.0.0.0", port=7860, threaded=True, use_reloader=False) | |
| def main(): | |
| if not BOT_TOKEN: | |
| logger.error("BOT_TOKEN environment variable not set!") | |
| return | |
| flask_thread = threading.Thread(target=run_flask, daemon=True) | |
| flask_thread.start() | |
| logger.info("Flask health check server started on port 7860") | |
| logger.info(f"Using Telegram API base URL: {TELEGRAM_API_BASE}") | |
| logger.info(f"Using Proxy for yt-dlp: {PROXY_URL}") | |
| request = HTTPXRequest( | |
| connection_pool_size=8, | |
| read_timeout=60.0, | |
| write_timeout=60.0, | |
| connect_timeout=30.0, | |
| ) | |
| application = ( | |
| Application.builder() | |
| .token(BOT_TOKEN) | |
| .base_url(f"{TELEGRAM_API_BASE}/bot") | |
| .base_file_url(f"{TELEGRAM_API_BASE}/file/bot") | |
| .request(request) | |
| .get_updates_request(request) | |
| .build() | |
| ) | |
| application.add_handler(CommandHandler("start", start_command)) | |
| application.add_handler(CommandHandler("help", help_command)) | |
| application.add_handler(CallbackQueryHandler(handle_callback)) | |
| application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) | |
| logger.info("Starting YouTube Downloader Bot...") | |
| application.run_polling(allowed_updates=Update.ALL_TYPES) | |
| if __name__ == "__main__": | |
| main() |