downloader / app.py
alphabagibagi's picture
Upload 7 files
fbf9aa4 verified
import os
import re
import asyncio
import threading
import logging
from pathlib import Path
from datetime import datetime
import yt_dlp
from flask import Flask
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
Application,
CommandHandler,
MessageHandler,
CallbackQueryHandler,
ContextTypes,
filters,
)
from telegram.request import HTTPXRequest
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.INFO
)
logger = logging.getLogger(__name__)
BOT_TOKEN = os.environ.get("BOT_TOKEN")
TELEGRAM_API_BASE = os.environ.get("TELEGRAM_API_BASE", "https://api.telegram.org")
PROXY_URL = os.environ.get("PROXY_URL", "socks5://127.0.0.1:1080")
DOWNLOAD_DIR = Path("/app/downloads")
DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True)
MAX_FILE_SIZE = 50 * 1024 * 1024
YOUTUBE_REGEX = re.compile(
r'(https?://)?(www\.)?(youtube|youtu|youtube-nocookie)\.(com|be)/'
r'(watch\?v=|embed/|v/|.+\?v=|shorts/)?([^&=%\?/]{11})'
)
flask_app = Flask(__name__)
@flask_app.route("/")
def health_check():
return {"status": "ok", "message": "YouTube Downloader Bot is running"}
@flask_app.route("/health")
def health():
return {"status": "healthy", "timestamp": datetime.now().isoformat()}
VIDEO_QUALITIES = [
("360p", "best[height<=360]"),
("480p", "best[height<=480]"),
("720p", "best[height<=720]"),
("1080p", "best[height<=1080]"),
]
def get_main_menu_keyboard():
keyboard = [
[InlineKeyboardButton("πŸ“Ί Cara Menggunakan", callback_data="help")],
[InlineKeyboardButton("ℹ️ Tentang Bot", callback_data="about")],
]
return InlineKeyboardMarkup(keyboard)
def get_format_keyboard(video_id: str):
keyboard = [
[
InlineKeyboardButton("🎬 Video", callback_data=f"format:video:{video_id}"),
InlineKeyboardButton("🎡 Audio (MP3)", callback_data=f"format:audio:{video_id}"),
],
[InlineKeyboardButton("❌ Batal", callback_data="cancel")],
]
return InlineKeyboardMarkup(keyboard)
def get_quality_keyboard(video_id: str):
keyboard = []
row = []
for i, (label, _) in enumerate(VIDEO_QUALITIES):
row.append(InlineKeyboardButton(label, callback_data=f"quality:{label}:{video_id}"))
if len(row) == 2:
keyboard.append(row)
row = []
if row:
keyboard.append(row)
keyboard.append([InlineKeyboardButton("❌ Batal", callback_data="cancel")])
return InlineKeyboardMarkup(keyboard)
def extract_video_id(url: str) -> str | None:
match = YOUTUBE_REGEX.search(url)
if match:
return match.group(6)
return None
class ProgressTracker:
"""Track download progress and update Telegram message."""
def __init__(self, context, chat_id, message_id, format_type):
self.context = context
self.chat_id = chat_id
self.message_id = message_id
self.format_type = format_type
self.last_update = 0
self.last_percent = -1
def progress_hook(self, d):
"""Called by yt-dlp during download."""
if d['status'] == 'downloading':
try:
percent_str = d.get('_percent_str', '0%').strip('%')
percent = float(percent_str)
# Update only if significant change (every 10%)
if percent - self.last_percent >= 10:
self.last_percent = percent
# Create update task
asyncio.create_task(self.update_message(percent))
except:
pass
async def update_message(self, percent):
"""Update telegram message with progress."""
try:
bars = int(percent / 10)
progress_bar = "β–“" * bars + "β–‘" * (10 - bars)
text = (
f"⏳ Mengunduh {self.format_type}...\n\n"
f"[{progress_bar}] {percent:.0f}%"
)
await self.context.bot.edit_message_text(
chat_id=self.chat_id,
message_id=self.message_id,
text=text
)
except Exception as e:
logger.debug(f"Progress update error: {e}")
def get_ydl_opts(proxy: str = None, progress_hook=None) -> dict:
opts = {
'quiet': True,
'no_warnings': True,
'extract_flat': False,
'socket_timeout': 30,
}
if proxy:
opts['proxy'] = proxy
if progress_hook:
opts['progress_hooks'] = [progress_hook]
return opts
def get_video_info(url: str) -> dict | None:
ydl_opts = get_ydl_opts(PROXY_URL)
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=False)
return {
'title': info.get('title', 'Unknown'),
'duration': info.get('duration', 0),
'thumbnail': info.get('thumbnail', ''),
'uploader': info.get('uploader', 'Unknown'),
}
except Exception as e:
logger.error(f"Error getting video info: {e}")
return None
def download_video_with_progress(url: str, quality: str, video_id: str, progress_hook=None) -> Path | None:
output_path = DOWNLOAD_DIR / f"{video_id}.mp4"
format_str = "best"
for label, fmt in VIDEO_QUALITIES:
if label == quality:
format_str = fmt
break
ydl_opts = get_ydl_opts(PROXY_URL, progress_hook)
ydl_opts.update({
'format': f'{format_str}[ext=mp4]/best[ext=mp4]/best',
'outtmpl': str(output_path),
'merge_output_format': 'mp4',
'concurrent_fragment_downloads': 5,
'retries': 10,
'fragment_retries': 10,
})
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
if output_path.exists():
return output_path
except Exception as e:
logger.error(f"Error downloading video: {e}")
return None
def download_audio_with_progress(url: str, video_id: str, progress_hook=None) -> Path | None:
output_path = DOWNLOAD_DIR / f"{video_id}.mp3"
ydl_opts = get_ydl_opts(PROXY_URL, progress_hook)
ydl_opts.update({
'format': 'bestaudio/best',
'outtmpl': str(DOWNLOAD_DIR / f"{video_id}.%(ext)s"),
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
'retries': 10,
'fragment_retries': 10,
})
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
if output_path.exists():
return output_path
except Exception as e:
logger.error(f"Error downloading audio: {e}")
return None
def cleanup_file(file_path: Path):
try:
if file_path.exists():
file_path.unlink()
except Exception as e:
logger.error(f"Error cleaning up file: {e}")
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
welcome_text = (
"🎬 *YouTube Downloader Bot*\n\n"
"Selamat datang! Bot ini membantu Anda download video YouTube.\n\n"
"πŸ“ *Cara Menggunakan:*\n"
"1. Kirim link YouTube\n"
"2. Pilih format (Video/Audio)\n"
"3. Pilih kualitas video\n"
"4. Tunggu proses download\n\n"
f"🌐 Proxy: `{PROXY_URL}`\n"
"⚠️ *Catatan:* Maksimal ukuran file 50MB"
)
await update.message.reply_text(
welcome_text,
parse_mode="Markdown",
reply_markup=get_main_menu_keyboard()
)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
help_text = (
"πŸ“– *Panduan Penggunaan*\n\n"
"1️⃣ Kirim link YouTube ke bot\n"
"2️⃣ Pilih format download:\n"
" β€’ 🎬 Video - download dengan video\n"
" β€’ 🎡 Audio - download MP3 saja\n"
"3️⃣ Pilih kualitas (untuk video)\n"
"4️⃣ Tunggu proses selesai\n\n"
"πŸ“Œ *Tips:*\n"
"β€’ Untuk video pendek, pilih 720p atau 1080p\n"
"β€’ Untuk video panjang, pilih 360p atau 480p\n"
"β€’ Audio MP3 cocok untuk musik"
)
await update.message.reply_text(help_text, parse_mode="Markdown")
async def handle_youtube_url(update: Update, context: ContextTypes.DEFAULT_TYPE):
text = update.message.text
video_id = extract_video_id(text)
if not video_id:
await update.message.reply_text(
"❌ Link tidak valid. Silakan kirim link YouTube yang benar."
)
return
context.user_data['current_url'] = text
context.user_data['current_video_id'] = video_id
status_msg = await update.message.reply_text("⏳ Mengambil informasi video...")
loop = asyncio.get_event_loop()
info = await loop.run_in_executor(None, get_video_info, text)
if not info:
await status_msg.edit_text(
"❌ Gagal mengambil informasi video.\n\n"
"Kemungkinan penyebab:\n"
"β€’ Video tidak tersedia\n"
"β€’ Proxy tidak berfungsi\n"
"β€’ Video dilindungi regional"
)
return
context.user_data['video_info'] = info
duration = info['duration']
mins, secs = divmod(duration, 60)
duration_str = f"{mins}:{secs:02d}"
info_text = (
f"πŸ“Ί *{info['title']}*\n\n"
f"πŸ‘€ Channel: {info['uploader']}\n"
f"⏱ Durasi: {duration_str}\n\n"
"Pilih format download:"
)
await status_msg.edit_text(
info_text,
parse_mode="Markdown",
reply_markup=get_format_keyboard(video_id)
)
async def handle_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
data = query.data
if data == "cancel":
await query.edit_message_text("❌ Download dibatalkan.")
return
if data == "help":
help_text = (
"πŸ“– *Panduan Penggunaan*\n\n"
"1️⃣ Kirim link YouTube ke bot\n"
"2️⃣ Pilih format download\n"
"3️⃣ Pilih kualitas (untuk video)\n"
"4️⃣ Tunggu proses selesai"
)
await query.edit_message_text(help_text, parse_mode="Markdown")
return
if data == "about":
about_text = (
"ℹ️ *Tentang Bot*\n\n"
"Bot ini dibuat untuk memudahkan download video YouTube.\n\n"
"πŸ”§ Teknologi:\n"
"β€’ Python + python-telegram-bot\n"
"β€’ yt-dlp untuk download\n"
"β€’ Xray untuk proxy\n"
"β€’ Hugging Face Spaces\n\n"
"⚠️ Gunakan dengan bijak!"
)
await query.edit_message_text(about_text, parse_mode="Markdown")
return
if data.startswith("format:"):
parts = data.split(":")
format_type = parts[1]
video_id = parts[2]
url = context.user_data.get('current_url')
if not url:
await query.edit_message_text("❌ Sesi expired. Silakan kirim link lagi.")
return
if format_type == "audio":
await query.edit_message_text("⏳ Memproses download audio...\n\n[β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘] 0%")
# Setup progress tracker
progress = ProgressTracker(
context,
query.message.chat_id,
query.message.message_id,
"audio"
)
loop = asyncio.get_event_loop()
file_path = await loop.run_in_executor(
None,
download_audio_with_progress,
url,
video_id,
progress.progress_hook
)
if not file_path:
await query.edit_message_text("❌ Gagal download audio. Coba lagi.")
return
file_size = file_path.stat().st_size
if file_size > MAX_FILE_SIZE:
cleanup_file(file_path)
await query.edit_message_text(
"❌ File terlalu besar (>50MB). Tidak bisa dikirim via Telegram."
)
return
await query.edit_message_text("πŸ“€ Mengirim audio...")
info = context.user_data.get('video_info', {})
try:
with open(file_path, 'rb') as audio_file:
await context.bot.send_audio(
chat_id=query.message.chat_id,
audio=audio_file,
title=info.get('title', 'Audio'),
performer=info.get('uploader', 'Unknown'),
)
await query.edit_message_text("βœ… Audio berhasil dikirim!")
except Exception as e:
logger.error(f"Error sending audio: {e}")
await query.edit_message_text("❌ Gagal mengirim audio.")
finally:
cleanup_file(file_path)
else:
await query.edit_message_text(
"🎬 Pilih kualitas video:",
reply_markup=get_quality_keyboard(video_id)
)
elif data.startswith("quality:"):
parts = data.split(":")
quality = parts[1]
video_id = parts[2]
url = context.user_data.get('current_url')
if not url:
await query.edit_message_text("❌ Sesi expired. Silakan kirim link lagi.")
return
await query.edit_message_text(f"⏳ Mengunduh video ({quality})...\n\n[β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘β–‘] 0%")
# Setup progress tracker
progress = ProgressTracker(
context,
query.message.chat_id,
query.message.message_id,
f"video {quality}"
)
loop = asyncio.get_event_loop()
file_path = await loop.run_in_executor(
None,
download_video_with_progress,
url,
quality,
video_id,
progress.progress_hook
)
if not file_path:
await query.edit_message_text("❌ Gagal download video. Coba lagi.")
return
file_size = file_path.stat().st_size
if file_size > MAX_FILE_SIZE:
cleanup_file(file_path)
await query.edit_message_text(
f"❌ File terlalu besar ({file_size // (1024*1024)}MB > 50MB).\n"
"Coba pilih kualitas yang lebih rendah."
)
return
await query.edit_message_text("πŸ“€ Mengirim video...")
info = context.user_data.get('video_info', {})
try:
with open(file_path, 'rb') as video_file:
await context.bot.send_video(
chat_id=query.message.chat_id,
video=video_file,
caption=f"πŸ“Ί {info.get('title', 'Video')}",
supports_streaming=True,
)
await query.edit_message_text("βœ… Video berhasil dikirim!")
except Exception as e:
logger.error(f"Error sending video: {e}")
await query.edit_message_text("❌ Gagal mengirim video.")
finally:
cleanup_file(file_path)
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
text = update.message.text
if YOUTUBE_REGEX.search(text):
await handle_youtube_url(update, context)
else:
await update.message.reply_text(
"πŸ‘‹ Kirim link YouTube untuk memulai download.\n\n"
"Contoh: https://www.youtube.com/watch?v=xxxxx"
)
def run_flask():
flask_app.run(host="0.0.0.0", port=7860, threaded=True, use_reloader=False)
def main():
if not BOT_TOKEN:
logger.error("BOT_TOKEN environment variable not set!")
return
flask_thread = threading.Thread(target=run_flask, daemon=True)
flask_thread.start()
logger.info("Flask health check server started on port 7860")
logger.info(f"Using Telegram API base URL: {TELEGRAM_API_BASE}")
logger.info(f"Using Proxy for yt-dlp: {PROXY_URL}")
request = HTTPXRequest(
connection_pool_size=8,
read_timeout=60.0,
write_timeout=60.0,
connect_timeout=30.0,
)
application = (
Application.builder()
.token(BOT_TOKEN)
.base_url(f"{TELEGRAM_API_BASE}/bot")
.base_file_url(f"{TELEGRAM_API_BASE}/file/bot")
.request(request)
.get_updates_request(request)
.build()
)
application.add_handler(CommandHandler("start", start_command))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CallbackQueryHandler(handle_callback))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
logger.info("Starting YouTube Downloader Bot...")
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
main()