Aaaa / bot.py
Osamaq1's picture
Upload bot.py
72986a1 verified
import os
import asyncio
import tempfile
import subprocess
import shutil
from pathlib import Path
# --- ู…ูƒุชุจุงุช ุฅุถุงููŠุฉ ู„ุฅุจู‚ุงุก ุงู„ุจูˆุช ู…ุณุชูŠู‚ุธุงู‹ ---
from flask import Flask, request as flask_request, jsonify
import threading
from telegram import Update, Message
from telegram.ext import (
Application,
CommandHandler,
MessageHandler,
filters,
ContextTypes,
)
from telegram.constants import ParseMode
# โ”€โ”€โ”€ Configuration โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
BOT_TOKEN = os.environ.get("BOT_TOKEN", "YOUR_BOT_TOKEN_HERE")
# ุฑุงุจุท ุงู„ู€ Space ุนู„ู‰ Hugging Face โ€” ู…ุซุงู„:
# https://osamaq1-stem-bot.hf.space
WEBHOOK_URL = os.environ.get("WEBHOOK_URL", "")
SUPPORTED_AUDIO_EXTENSIONS = {
".mp3", ".wav", ".flac", ".ogg", ".m4a", ".aac",
".wma", ".opus", ".aiff", ".aif", ".mp4", ".mkv",
".webm", ".mov", ".avi", ".flv", ".ts", ".mts",
}
MAX_FILE_SIZE_MB = 200
# โ”€โ”€โ”€ Helpers โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def human_size(num_bytes: int) -> str:
for unit in ("B", "KB", "MB", "GB"):
if abs(num_bytes) < 1024:
return f"{num_bytes:.1f} {unit}"
num_bytes /= 1024
return f"{num_bytes:.1f} TB"
async def edit_or_reply(msg: Message, text: str, **kwargs) -> Message:
try:
return await msg.edit_text(text, **kwargs)
except Exception:
return await msg.reply_text(text, **kwargs)
def run_demucs(input_path: str, output_dir: str) -> tuple[str, str]:
cmd = [
"python", "-m", "demucs",
"--two-stems", "vocals",
"-n", "htdemucs",
"--mp3",
"-o", output_dir,
input_path,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(result.stderr[-2000:] or "Demucs failed with no output")
stem_dir = Path(output_dir) / "htdemucs" / Path(input_path).stem
vocals = stem_dir / "vocals.mp3"
no_vocals = stem_dir / "no_vocals.mp3"
if not vocals.exists() or not no_vocals.exists():
raise RuntimeError(
f"Expected output files not found in {stem_dir}.\n"
f"Directory contents: {list(stem_dir.iterdir()) if stem_dir.exists() else 'dir missing'}"
)
return str(vocals), str(no_vocals)
def extract_audio_from_video(video_path: str, out_wav: str) -> None:
cmd = [
"ffmpeg", "-y", "-i", video_path,
"-vn", "-ar", "44100", "-ac", "2",
"-c:a", "pcm_s16le", out_wav,
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"ffmpeg audio extraction failed:\n{result.stderr[-1500:]}")
VIDEO_EXTENSIONS = {".mp4", ".mkv", ".webm", ".mov", ".avi", ".flv", ".ts", ".mts"}
# โ”€โ”€โ”€ Command Handlers โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
text = (
"๐ŸŽต *ู…ุฑุญุจุงู‹ ุจูƒ ููŠ ุจูˆุช ูุตู„ ุงู„ุตูˆุช!*\n\n"
"ุฃุฑุณู„ ู„ูŠ ุฃูŠ ู…ู„ู ุตูˆุชูŠ ุฃูˆ ููŠุฏูŠูˆ ูˆุณุฃูุตู„:\n"
" ๐ŸŽค *ุตูˆุช ุงู„ุบู†ุงุก / ุงู„ูƒู„ุงู…* โ†’ `vocals.mp3`\n"
" ๐ŸŽธ *ุงู„ู…ูˆุณูŠู‚ู‰ / ุงู„ู…ุตุงุญุจุฉ* โ†’ `no_vocals.mp3`\n\n"
"๐Ÿ“ *ุงู„ุงู…ุชุฏุงุฏุงุช ุงู„ู…ุฏุนูˆู…ุฉ:*\n"
"`mp3, wav, flac, ogg, m4a, aac, wma, opus, aiff,\n"
"mp4, mkv, webm, mov, avi, flv, ts, mts`\n\n"
f"โš ๏ธ ุงู„ุญุฏ ุงู„ุฃู‚ุตู‰ ู„ุญุฌู… ุงู„ู…ู„ู: *{MAX_FILE_SIZE_MB} MB*\n\n"
"ุงุณุชุฎุฏู… /help ู„ู…ุฒูŠุฏ ู…ู† ุงู„ู…ุนู„ูˆู…ุงุช."
)
await update.message.reply_text(text, parse_mode=ParseMode.MARKDOWN)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
text = (
"๐Ÿ“– *ูƒูŠู ุฃุณุชุฎุฏู… ุงู„ุจูˆุชุŸ*\n\n"
"1๏ธโƒฃ ุฃุฑุณู„ ู…ู„ูุงู‹ ุตูˆุชูŠุงู‹ ุฃูˆ ู…ู‚ุทุน ููŠุฏูŠูˆ.\n"
"2๏ธโƒฃ ุงู†ุชุธุฑ ู‚ู„ูŠู„ุงู‹ (ุงู„ูุตู„ ูŠุฃุฎุฐ 1โ€“5 ุฏู‚ุงุฆู‚ ุญุณุจ ุทูˆู„ ุงู„ู…ู‚ุทุน).\n"
"3๏ธโƒฃ ุณุชุตู„ูƒ ู…ู„ูุงู†:\n"
" โ€ข `vocals.mp3` โ€” ุงู„ุบู†ุงุก / ุงู„ูƒู„ุงู… ูู‚ุท\n"
" โ€ข `no_vocals.mp3` โ€” ุงู„ู…ูˆุณูŠู‚ู‰ / ุงู„ู…ุตุงุญุจุฉ ูู‚ุท\n\n"
"๐Ÿ”ง *ุงู„ู†ู…ูˆุฐุฌ ุงู„ู…ุณุชุฎุฏู…:* htdemucs (Meta AI)\n"
"๐Ÿ“Œ *ู…ู„ุงุญุธุฉ:* ุฌูˆุฏุฉ ุงู„ูุตู„ ุชุนุชู…ุฏ ุนู„ู‰ ูˆุถูˆุญ ุงู„ุตูˆุช ุงู„ุฃุตู„ูŠ."
)
await update.message.reply_text(text, parse_mode=ParseMode.MARKDOWN)
# โ”€โ”€โ”€ Core Processing โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
async def process_media(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
message = update.message
file_obj = None
original_filename = "audio"
is_video = False
if message.audio:
file_obj = message.audio
original_filename = message.audio.file_name or "audio.mp3"
elif message.voice:
file_obj = message.voice
original_filename = "voice.ogg"
elif message.video:
file_obj = message.video
original_filename = message.video.file_name or "video.mp4"
is_video = True
elif message.video_note:
file_obj = message.video_note
original_filename = "video_note.mp4"
is_video = True
elif message.document:
file_obj = message.document
original_filename = message.document.file_name or "file"
ext = Path(original_filename).suffix.lower()
if ext not in SUPPORTED_AUDIO_EXTENSIONS:
await message.reply_text(
f"โŒ ุงู„ุงู…ุชุฏุงุฏ *{ext}* ุบูŠุฑ ู…ุฏุนูˆู….\n\n"
"ุงู„ุงู…ุชุฏุงุฏุงุช ุงู„ู…ุฏุนูˆู…ุฉ:\n"
"`mp3, wav, flac, ogg, m4a, aac, wma, opus, aiff,\n"
"mp4, mkv, webm, mov, avi, flv, ts, mts`",
parse_mode=ParseMode.MARKDOWN,
)
return
if ext in VIDEO_EXTENSIONS:
is_video = True
else:
await message.reply_text(
"โš ๏ธ ุฃุฑุณู„ ู…ู„ูุงู‹ ุตูˆุชูŠุงู‹ ุฃูˆ ููŠุฏูŠูˆ ู…ู† ูุถู„ูƒ.\n"
"ุงุณุชุฎุฏู… /help ู„ู…ุนุฑูุฉ ุงู„ุงู…ุชุฏุงุฏุงุช ุงู„ู…ุฏุนูˆู…ุฉ."
)
return
if file_obj.file_size and file_obj.file_size > MAX_FILE_SIZE_MB * 1024 * 1024:
await message.reply_text(
f"โŒ ุญุฌู… ุงู„ู…ู„ู ({human_size(file_obj.file_size)}) ูŠุชุฌุงูˆุฒ ุงู„ุญุฏ ุงู„ู…ุณู…ูˆุญ "
f"({MAX_FILE_SIZE_MB} MB)."
)
return
status_msg = await message.reply_text("โฌ‡๏ธ ุฌุงุฑู ุชุญู…ูŠู„ ุงู„ู…ู„ูโ€ฆ")
with tempfile.TemporaryDirectory() as tmpdir:
try:
tg_file = await context.bot.get_file(file_obj.file_id)
suffix = Path(original_filename).suffix or ".mp3"
input_path = os.path.join(tmpdir, f"input{suffix}")
await tg_file.download_to_drive(input_path)
await edit_or_reply(
status_msg,
"โœ… ุชู… ุงู„ุชุญู…ูŠู„.\n"
"๐ŸŽฌ ุฌุงุฑู ุงุณุชุฎุฑุงุฌ ุงู„ุตูˆุชโ€ฆ" if is_video else
"โœ… ุชู… ุงู„ุชุญู…ูŠู„.\n๐Ÿ”„ ุฌุงุฑู ูุตู„ ุงู„ุตูˆุช (ู‚ุฏ ูŠุณุชุบุฑู‚ ุจุถุน ุฏู‚ุงุฆู‚)โ€ฆ",
)
if is_video:
wav_path = os.path.join(tmpdir, "extracted.wav")
await asyncio.get_event_loop().run_in_executor(
None, extract_audio_from_video, input_path, wav_path
)
input_path = wav_path
await edit_or_reply(
status_msg,
"โœ… ุชู… ุงุณุชุฎุฑุงุฌ ุงู„ุตูˆุช.\n๐Ÿ”„ ุฌุงุฑู ูุตู„ ุงู„ุตูˆุช (ู‚ุฏ ูŠุณุชุบุฑู‚ ุจุถุน ุฏู‚ุงุฆู‚)โ€ฆ",
)
output_dir = os.path.join(tmpdir, "out")
os.makedirs(output_dir, exist_ok=True)
vocals_path, no_vocals_path = await asyncio.get_event_loop().run_in_executor(
None, run_demucs, input_path, output_dir
)
await edit_or_reply(status_msg, "โœ… ุชู… ุงู„ูุตู„!\n๐Ÿ“ค ุฌุงุฑู ุฅุฑุณุงู„ ุงู„ู…ู„ูุงุชโ€ฆ")
base = Path(original_filename).stem
with open(vocals_path, "rb") as f:
await message.reply_audio(
audio=f,
filename=f"{base}_vocals.mp3",
title=f"{base} โ€” ุตูˆุช ุงู„ุบู†ุงุก ๐ŸŽค",
caption="๐ŸŽค *ุตูˆุช ุงู„ุบู†ุงุก / ุงู„ูƒู„ุงู…*",
parse_mode=ParseMode.MARKDOWN,
)
with open(no_vocals_path, "rb") as f:
await message.reply_audio(
audio=f,
filename=f"{base}_instrumental.mp3",
title=f"{base} โ€” ุงู„ู…ูˆุณูŠู‚ู‰ ๐ŸŽธ",
caption="๐ŸŽธ *ุงู„ู…ูˆุณูŠู‚ู‰ / ุงู„ู…ุตุงุญุจุฉ*",
parse_mode=ParseMode.MARKDOWN,
)
await edit_or_reply(status_msg, "โœ… ุชู… ุงู„ุฅุฑุณุงู„ ุจู†ุฌุงุญ! ุฃุฑุณู„ ู…ู„ูุงู‹ ุขุฎุฑ ู…ุชู‰ ุดุฆุช.")
except RuntimeError as exc:
await edit_or_reply(
status_msg,
f"โŒ *ุญุฏุซ ุฎุทุฃ ุฃุซู†ุงุก ุงู„ู…ุนุงู„ุฌุฉ:*\n`{str(exc)[:800]}`",
parse_mode=ParseMode.MARKDOWN,
)
except Exception as exc:
await edit_or_reply(
status_msg,
f"โŒ *ุฎุทุฃ ุบูŠุฑ ู…ุชูˆู‚ุน:*\n`{str(exc)[:800]}`",
parse_mode=ParseMode.MARKDOWN,
)
# โ”€โ”€โ”€ Entry Point โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# ู…ุชุบูŠุฑ ุนุงู… ู„ู„ุชุทุจูŠู‚ ุญุชู‰ ูŠุณุชุฎุฏู…ู‡ Flask
telegram_app: Application = None
def main() -> None:
global telegram_app
if BOT_TOKEN == "YOUR_BOT_TOKEN_HERE":
raise SystemExit(
"โŒ ุถุน ุชูˆูƒู† ุงู„ุจูˆุช ููŠ ู…ุชุบูŠุฑ ุงู„ุจูŠุฆุฉ BOT_TOKEN ุฃูˆ ุนุฏู‘ู„ ุงู„ุณุทุฑ ุงู„ุฃูˆู„ ู…ู† ุงู„ูƒูˆุฏ."
)
if not WEBHOOK_URL:
raise SystemExit(
"โŒ ุถุน ุฑุงุจุท ุงู„ู€ Space ููŠ ู…ุชุบูŠุฑ ุงู„ุจูŠุฆุฉ WEBHOOK_URL\n"
"ู…ุซุงู„: https://osamaq1-stem-bot.hf.space"
)
# ุจู†ุงุก ุชุทุจูŠู‚ ุชู„ุฌุฑุงู…
telegram_app = Application.builder().token(BOT_TOKEN).build()
telegram_app.add_handler(CommandHandler("start", start))
telegram_app.add_handler(CommandHandler("help", help_command))
media_filter = (
filters.AUDIO
| filters.VOICE
| filters.VIDEO
| filters.VIDEO_NOTE
| filters.Document.ALL
)
telegram_app.add_handler(MessageHandler(media_filter, process_media))
# โ”€โ”€โ”€ Flask โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
flask_app = Flask(__name__)
@flask_app.route("/", methods=["GET"])
def index():
return "๐Ÿค– Bot is running!", 200
@flask_app.route(f"/{BOT_TOKEN}", methods=["POST"])
def webhook():
"""ุชู„ุฌุฑุงู… ูŠุฑุณู„ ุงู„ุชุญุฏูŠุซุงุช ู‡ู†ุง."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async def process():
await telegram_app.initialize()
update = Update.de_json(flask_request.get_json(force=True), telegram_app.bot)
await telegram_app.process_update(update)
loop.run_until_complete(process())
loop.close()
return jsonify({"ok": True})
# ุถุจุท ุงู„ู€ Webhook ุนู†ุฏ ุจุฏุก ุงู„ุชุดุบูŠู„
async def set_webhook():
await telegram_app.initialize()
webhook_full = f"{WEBHOOK_URL.rstrip('/')}/{BOT_TOKEN}"
await telegram_app.bot.set_webhook(url=webhook_full)
print(f"โœ… Webhook set: {webhook_full}")
asyncio.run(set_webhook())
print("๐Ÿค– ุงู„ุจูˆุช ูŠุนู…ู„ ุงู„ุขู† ุนุจุฑ Webhook ุนู„ู‰ Hugging Faceโ€ฆ")
flask_app.run(host="0.0.0.0", port=7860)
if __name__ == "__main__":
main()