| import os |
| import asyncio |
| import tempfile |
| import subprocess |
| import shutil |
| from pathlib import Path |
|
|
| |
| from flask import Flask, request as flask_request, jsonify |
| import threading |
|
|
| from telegram import Update, Message |
| from telegram.ext import ( |
| Application, |
| CommandHandler, |
| MessageHandler, |
| filters, |
| ContextTypes, |
| ) |
| from telegram.constants import ParseMode |
|
|
| |
| BOT_TOKEN = os.environ.get("BOT_TOKEN", "YOUR_BOT_TOKEN_HERE") |
|
|
| |
| |
| WEBHOOK_URL = os.environ.get("WEBHOOK_URL", "") |
|
|
| SUPPORTED_AUDIO_EXTENSIONS = { |
| ".mp3", ".wav", ".flac", ".ogg", ".m4a", ".aac", |
| ".wma", ".opus", ".aiff", ".aif", ".mp4", ".mkv", |
| ".webm", ".mov", ".avi", ".flv", ".ts", ".mts", |
| } |
|
|
| MAX_FILE_SIZE_MB = 200 |
|
|
| |
|
|
| def human_size(num_bytes: int) -> str: |
| for unit in ("B", "KB", "MB", "GB"): |
| if abs(num_bytes) < 1024: |
| return f"{num_bytes:.1f} {unit}" |
| num_bytes /= 1024 |
| return f"{num_bytes:.1f} TB" |
|
|
|
|
| async def edit_or_reply(msg: Message, text: str, **kwargs) -> Message: |
| try: |
| return await msg.edit_text(text, **kwargs) |
| except Exception: |
| return await msg.reply_text(text, **kwargs) |
|
|
|
|
| def run_demucs(input_path: str, output_dir: str) -> tuple[str, str]: |
| cmd = [ |
| "python", "-m", "demucs", |
| "--two-stems", "vocals", |
| "-n", "htdemucs", |
| "--mp3", |
| "-o", output_dir, |
| input_path, |
| ] |
| result = subprocess.run(cmd, capture_output=True, text=True) |
| if result.returncode != 0: |
| raise RuntimeError(result.stderr[-2000:] or "Demucs failed with no output") |
|
|
| stem_dir = Path(output_dir) / "htdemucs" / Path(input_path).stem |
| vocals = stem_dir / "vocals.mp3" |
| no_vocals = stem_dir / "no_vocals.mp3" |
|
|
| if not vocals.exists() or not no_vocals.exists(): |
| raise RuntimeError( |
| f"Expected output files not found in {stem_dir}.\n" |
| f"Directory contents: {list(stem_dir.iterdir()) if stem_dir.exists() else 'dir missing'}" |
| ) |
| return str(vocals), str(no_vocals) |
|
|
|
|
| def extract_audio_from_video(video_path: str, out_wav: str) -> None: |
| cmd = [ |
| "ffmpeg", "-y", "-i", video_path, |
| "-vn", "-ar", "44100", "-ac", "2", |
| "-c:a", "pcm_s16le", out_wav, |
| ] |
| result = subprocess.run(cmd, capture_output=True, text=True) |
| if result.returncode != 0: |
| raise RuntimeError(f"ffmpeg audio extraction failed:\n{result.stderr[-1500:]}") |
|
|
|
|
| VIDEO_EXTENSIONS = {".mp4", ".mkv", ".webm", ".mov", ".avi", ".flv", ".ts", ".mts"} |
|
|
|
|
| |
|
|
| async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: |
| text = ( |
| "๐ต *ู
ุฑุญุจุงู ุจู ูู ุจูุช ูุตู ุงูุตูุช!*\n\n" |
| "ุฃุฑุณู ูู ุฃู ู
ูู ุตูุชู ุฃู ููุฏูู ูุณุฃูุตู:\n" |
| " ๐ค *ุตูุช ุงูุบูุงุก / ุงูููุงู
* โ `vocals.mp3`\n" |
| " ๐ธ *ุงูู
ูุณููู / ุงูู
ุตุงุญุจุฉ* โ `no_vocals.mp3`\n\n" |
| "๐ *ุงูุงู
ุชุฏุงุฏุงุช ุงูู
ุฏุนูู
ุฉ:*\n" |
| "`mp3, wav, flac, ogg, m4a, aac, wma, opus, aiff,\n" |
| "mp4, mkv, webm, mov, avi, flv, ts, mts`\n\n" |
| f"โ ๏ธ ุงูุญุฏ ุงูุฃูุตู ูุญุฌู
ุงูู
ูู: *{MAX_FILE_SIZE_MB} MB*\n\n" |
| "ุงุณุชุฎุฏู
/help ูู
ุฒูุฏ ู
ู ุงูู
ุนููู
ุงุช." |
| ) |
| await update.message.reply_text(text, parse_mode=ParseMode.MARKDOWN) |
|
|
|
|
| async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: |
| text = ( |
| "๐ *ููู ุฃุณุชุฎุฏู
ุงูุจูุชุ*\n\n" |
| "1๏ธโฃ ุฃุฑุณู ู
ููุงู ุตูุชูุงู ุฃู ู
ูุทุน ููุฏูู.\n" |
| "2๏ธโฃ ุงูุชุธุฑ ููููุงู (ุงููุตู ูุฃุฎุฐ 1โ5 ุฏูุงุฆู ุญุณุจ ุทูู ุงูู
ูุทุน).\n" |
| "3๏ธโฃ ุณุชุตูู ู
ููุงู:\n" |
| " โข `vocals.mp3` โ ุงูุบูุงุก / ุงูููุงู
ููุท\n" |
| " โข `no_vocals.mp3` โ ุงูู
ูุณููู / ุงูู
ุตุงุญุจุฉ ููุท\n\n" |
| "๐ง *ุงููู
ูุฐุฌ ุงูู
ุณุชุฎุฏู
:* htdemucs (Meta AI)\n" |
| "๐ *ู
ูุงุญุธุฉ:* ุฌูุฏุฉ ุงููุตู ุชุนุชู
ุฏ ุนูู ูุถูุญ ุงูุตูุช ุงูุฃุตูู." |
| ) |
| await update.message.reply_text(text, parse_mode=ParseMode.MARKDOWN) |
|
|
|
|
| |
|
|
| async def process_media(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: |
| message = update.message |
|
|
| file_obj = None |
| original_filename = "audio" |
| is_video = False |
|
|
| if message.audio: |
| file_obj = message.audio |
| original_filename = message.audio.file_name or "audio.mp3" |
| elif message.voice: |
| file_obj = message.voice |
| original_filename = "voice.ogg" |
| elif message.video: |
| file_obj = message.video |
| original_filename = message.video.file_name or "video.mp4" |
| is_video = True |
| elif message.video_note: |
| file_obj = message.video_note |
| original_filename = "video_note.mp4" |
| is_video = True |
| elif message.document: |
| file_obj = message.document |
| original_filename = message.document.file_name or "file" |
| ext = Path(original_filename).suffix.lower() |
| if ext not in SUPPORTED_AUDIO_EXTENSIONS: |
| await message.reply_text( |
| f"โ ุงูุงู
ุชุฏุงุฏ *{ext}* ุบูุฑ ู
ุฏุนูู
.\n\n" |
| "ุงูุงู
ุชุฏุงุฏุงุช ุงูู
ุฏุนูู
ุฉ:\n" |
| "`mp3, wav, flac, ogg, m4a, aac, wma, opus, aiff,\n" |
| "mp4, mkv, webm, mov, avi, flv, ts, mts`", |
| parse_mode=ParseMode.MARKDOWN, |
| ) |
| return |
| if ext in VIDEO_EXTENSIONS: |
| is_video = True |
| else: |
| await message.reply_text( |
| "โ ๏ธ ุฃุฑุณู ู
ููุงู ุตูุชูุงู ุฃู ููุฏูู ู
ู ูุถูู.\n" |
| "ุงุณุชุฎุฏู
/help ูู
ุนุฑูุฉ ุงูุงู
ุชุฏุงุฏุงุช ุงูู
ุฏุนูู
ุฉ." |
| ) |
| return |
|
|
| if file_obj.file_size and file_obj.file_size > MAX_FILE_SIZE_MB * 1024 * 1024: |
| await message.reply_text( |
| f"โ ุญุฌู
ุงูู
ูู ({human_size(file_obj.file_size)}) ูุชุฌุงูุฒ ุงูุญุฏ ุงูู
ุณู
ูุญ " |
| f"({MAX_FILE_SIZE_MB} MB)." |
| ) |
| return |
|
|
| status_msg = await message.reply_text("โฌ๏ธ ุฌุงุฑู ุชุญู
ูู ุงูู
ููโฆ") |
|
|
| with tempfile.TemporaryDirectory() as tmpdir: |
| try: |
| tg_file = await context.bot.get_file(file_obj.file_id) |
| suffix = Path(original_filename).suffix or ".mp3" |
| input_path = os.path.join(tmpdir, f"input{suffix}") |
| await tg_file.download_to_drive(input_path) |
|
|
| await edit_or_reply( |
| status_msg, |
| "โ
ุชู
ุงูุชุญู
ูู.\n" |
| "๐ฌ ุฌุงุฑู ุงุณุชุฎุฑุงุฌ ุงูุตูุชโฆ" if is_video else |
| "โ
ุชู
ุงูุชุญู
ูู.\n๐ ุฌุงุฑู ูุตู ุงูุตูุช (ูุฏ ูุณุชุบุฑู ุจุถุน ุฏูุงุฆู)โฆ", |
| ) |
|
|
| if is_video: |
| wav_path = os.path.join(tmpdir, "extracted.wav") |
| await asyncio.get_event_loop().run_in_executor( |
| None, extract_audio_from_video, input_path, wav_path |
| ) |
| input_path = wav_path |
| await edit_or_reply( |
| status_msg, |
| "โ
ุชู
ุงุณุชุฎุฑุงุฌ ุงูุตูุช.\n๐ ุฌุงุฑู ูุตู ุงูุตูุช (ูุฏ ูุณุชุบุฑู ุจุถุน ุฏูุงุฆู)โฆ", |
| ) |
|
|
| output_dir = os.path.join(tmpdir, "out") |
| os.makedirs(output_dir, exist_ok=True) |
|
|
| vocals_path, no_vocals_path = await asyncio.get_event_loop().run_in_executor( |
| None, run_demucs, input_path, output_dir |
| ) |
|
|
| await edit_or_reply(status_msg, "โ
ุชู
ุงููุตู!\n๐ค ุฌุงุฑู ุฅุฑุณุงู ุงูู
ููุงุชโฆ") |
|
|
| base = Path(original_filename).stem |
|
|
| with open(vocals_path, "rb") as f: |
| await message.reply_audio( |
| audio=f, |
| filename=f"{base}_vocals.mp3", |
| title=f"{base} โ ุตูุช ุงูุบูุงุก ๐ค", |
| caption="๐ค *ุตูุช ุงูุบูุงุก / ุงูููุงู
*", |
| parse_mode=ParseMode.MARKDOWN, |
| ) |
|
|
| with open(no_vocals_path, "rb") as f: |
| await message.reply_audio( |
| audio=f, |
| filename=f"{base}_instrumental.mp3", |
| title=f"{base} โ ุงูู
ูุณููู ๐ธ", |
| caption="๐ธ *ุงูู
ูุณููู / ุงูู
ุตุงุญุจุฉ*", |
| parse_mode=ParseMode.MARKDOWN, |
| ) |
|
|
| await edit_or_reply(status_msg, "โ
ุชู
ุงูุฅุฑุณุงู ุจูุฌุงุญ! ุฃุฑุณู ู
ููุงู ุขุฎุฑ ู
ุชู ุดุฆุช.") |
|
|
| except RuntimeError as exc: |
| await edit_or_reply( |
| status_msg, |
| f"โ *ุญุฏุซ ุฎุทุฃ ุฃุซูุงุก ุงูู
ุนุงูุฌุฉ:*\n`{str(exc)[:800]}`", |
| parse_mode=ParseMode.MARKDOWN, |
| ) |
| except Exception as exc: |
| await edit_or_reply( |
| status_msg, |
| f"โ *ุฎุทุฃ ุบูุฑ ู
ุชููุน:*\n`{str(exc)[:800]}`", |
| parse_mode=ParseMode.MARKDOWN, |
| ) |
|
|
|
|
| |
|
|
| |
| telegram_app: Application = None |
|
|
|
|
| def main() -> None: |
| global telegram_app |
|
|
| if BOT_TOKEN == "YOUR_BOT_TOKEN_HERE": |
| raise SystemExit( |
| "โ ุถุน ุชููู ุงูุจูุช ูู ู
ุชุบูุฑ ุงูุจูุฆุฉ BOT_TOKEN ุฃู ุนุฏูู ุงูุณุทุฑ ุงูุฃูู ู
ู ุงูููุฏ." |
| ) |
|
|
| if not WEBHOOK_URL: |
| raise SystemExit( |
| "โ ุถุน ุฑุงุจุท ุงูู Space ูู ู
ุชุบูุฑ ุงูุจูุฆุฉ WEBHOOK_URL\n" |
| "ู
ุซุงู: https://osamaq1-stem-bot.hf.space" |
| ) |
|
|
| |
| telegram_app = Application.builder().token(BOT_TOKEN).build() |
| telegram_app.add_handler(CommandHandler("start", start)) |
| telegram_app.add_handler(CommandHandler("help", help_command)) |
|
|
| media_filter = ( |
| filters.AUDIO |
| | filters.VOICE |
| | filters.VIDEO |
| | filters.VIDEO_NOTE |
| | filters.Document.ALL |
| ) |
| telegram_app.add_handler(MessageHandler(media_filter, process_media)) |
|
|
| |
| flask_app = Flask(__name__) |
|
|
| @flask_app.route("/", methods=["GET"]) |
| def index(): |
| return "๐ค Bot is running!", 200 |
|
|
| @flask_app.route(f"/{BOT_TOKEN}", methods=["POST"]) |
| def webhook(): |
| """ุชูุฌุฑุงู
ูุฑุณู ุงูุชุญุฏูุซุงุช ููุง.""" |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
|
|
| async def process(): |
| await telegram_app.initialize() |
| update = Update.de_json(flask_request.get_json(force=True), telegram_app.bot) |
| await telegram_app.process_update(update) |
|
|
| loop.run_until_complete(process()) |
| loop.close() |
| return jsonify({"ok": True}) |
|
|
| |
| async def set_webhook(): |
| await telegram_app.initialize() |
| webhook_full = f"{WEBHOOK_URL.rstrip('/')}/{BOT_TOKEN}" |
| await telegram_app.bot.set_webhook(url=webhook_full) |
| print(f"โ
Webhook set: {webhook_full}") |
|
|
| asyncio.run(set_webhook()) |
|
|
| print("๐ค ุงูุจูุช ูุนู
ู ุงูุขู ุนุจุฑ Webhook ุนูู Hugging Faceโฆ") |
| flask_app.run(host="0.0.0.0", port=7860) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|