Spaces:
Sleeping
Sleeping
| # Copyright (C) @TheSmartBisnu | |
| # Channel: https://t.me/itsSmartDev | |
| # | |
| # Single-file webhook bot β zero local module imports. | |
| # All helpers are inlined so HuggingFace Spaces Docker can run: | |
| # uvicorn main:app --host 0.0.0.0 --port 7860 | |
| import os | |
| import sys | |
| import shutil | |
| import psutil | |
| import asyncio | |
| import logging | |
| import traceback | |
| from time import time | |
| from typing import Optional | |
| from contextlib import asynccontextmanager | |
| from logging.handlers import RotatingFileHandler | |
| from dotenv import load_dotenv | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import JSONResponse | |
| from pyrogram import Client | |
| from pyrogram.enums import ParseMode | |
| from pyrogram.errors import PeerIdInvalid, BadRequest, FloodWait | |
| from pyrogram.parser import Parser | |
| from pyrogram.utils import get_channel_id | |
| from pyrogram.types import InputMediaPhoto, InputMediaVideo, InputMediaAudio, InputMediaDocument | |
| from pyleaves import Leaves | |
| # ============================================================ | |
| # Logging | |
| # ============================================================ | |
| try: | |
| os.remove("logs.txt") | |
| except Exception: | |
| pass | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="[%(asctime)s - %(levelname)s] - %(funcName)s() - Line %(lineno)d: %(name)s - %(message)s", | |
| datefmt="%d-%b-%y %I:%M:%S %p", | |
| handlers=[ | |
| RotatingFileHandler("logs.txt", mode="w+", maxBytes=5_000_000, backupCount=10), | |
| logging.StreamHandler(), | |
| ], | |
| ) | |
| logging.getLogger("pyrogram").setLevel(logging.ERROR) | |
| def LOGGER(name: str) -> logging.Logger: | |
| return logging.getLogger(name) | |
| # ============================================================ | |
| # Config | |
| # ============================================================ | |
| try: | |
| load_dotenv("config.env.local") | |
| load_dotenv("config.env") | |
| except Exception: | |
| pass | |
| _missing = [v for v in ("BOT_TOKEN", "SESSION_STRING", "API_ID", "API_HASH") if not os.getenv(v)] | |
| if _missing: | |
| print(f"ERROR: Missing required env vars: {', '.join(_missing)}") | |
| sys.exit(1) | |
| class PyroConf: | |
| API_ID = int(os.getenv("API_ID", "6")) | |
| API_HASH = os.getenv("API_HASH", "eb06d4abfb49dc3eeb1aeb98ae0f581e") | |
| BOT_TOKEN = os.getenv("BOT_TOKEN") | |
| SESSION_STRING = os.getenv("SESSION_STRING") | |
| BOT_START_TIME = time() | |
| MAX_CONCURRENT_DOWNLOADS = int(os.getenv("MAX_CONCURRENT_DOWNLOADS", "1")) | |
| BATCH_SIZE = int(os.getenv("BATCH_SIZE", "1")) | |
| FLOOD_WAIT_DELAY = int(os.getenv("FLOOD_WAIT_DELAY", "10")) | |
| # ============================================================ | |
| # File helpers | |
| # ============================================================ | |
| SIZE_UNITS = ["B", "KB", "MB", "GB", "TB", "PB"] | |
| def get_download_path(folder_id: int, filename: str, root_dir: str = "downloads") -> str: | |
| folder = os.path.join(root_dir, str(folder_id)) | |
| os.makedirs(folder, exist_ok=True) | |
| return os.path.join(folder, filename) | |
| def cleanup_download(path: str) -> None: | |
| try: | |
| if os.path.exists(path): | |
| os.remove(path) | |
| if os.path.exists(path + ".temp"): | |
| os.remove(path + ".temp") | |
| folder = os.path.dirname(path) | |
| if os.path.isdir(folder) and not os.listdir(folder): | |
| os.rmdir(folder) | |
| except Exception as e: | |
| LOGGER(__name__).error(f"Cleanup failed for {path}: {e}") | |
| def cleanup_downloads_root(root_dir: str = "downloads") -> tuple: | |
| if not os.path.isdir(root_dir): | |
| return 0, 0 | |
| file_count = 0 | |
| total_size = 0 | |
| for dirpath, _, filenames in os.walk(root_dir): | |
| for name in filenames: | |
| file_count += 1 | |
| try: | |
| total_size += os.path.getsize(os.path.join(dirpath, name)) | |
| except OSError: | |
| pass | |
| shutil.rmtree(root_dir, ignore_errors=True) | |
| return file_count, total_size | |
| def get_readable_file_size(size_in_bytes: Optional[float]) -> str: | |
| if size_in_bytes is None or size_in_bytes < 0: | |
| return "0B" | |
| for unit in SIZE_UNITS: | |
| if size_in_bytes < 1024: | |
| return f"{size_in_bytes:.2f} {unit}" | |
| size_in_bytes /= 1024 | |
| return "File too large" | |
| def get_readable_time(seconds: int) -> str: | |
| result = "" | |
| days, remainder = divmod(seconds, 86400) | |
| if int(days): | |
| result += f"{int(days)}d" | |
| hours, remainder = divmod(remainder, 3600) | |
| if int(hours): | |
| result += f"{int(hours)}h" | |
| minutes, seconds = divmod(remainder, 60) | |
| if int(minutes): | |
| result += f"{int(minutes)}m" | |
| result += f"{int(seconds)}s" | |
| return result | |
| async def fileSizeLimit(file_size, message, action_type="download", is_premium=False): | |
| MAX_FILE_SIZE = 2 * 2_097_152_000 if is_premium else 2_097_152_000 | |
| if file_size > MAX_FILE_SIZE: | |
| await message.reply( | |
| f"The file size exceeds the {get_readable_file_size(MAX_FILE_SIZE)} " | |
| f"limit and cannot be {action_type}ed.") | |
| return False | |
| return True | |
| # ============================================================ | |
| # Message / link helpers | |
| # ============================================================ | |
| async def get_parsed_msg(text, entities): | |
| return Parser.unparse(text, entities or [], is_html=False) | |
| def getChatMsgID(link: str): | |
| linkps = link.split("/") | |
| chat_id = message_id = None | |
| try: | |
| if len(linkps) == 7 and linkps[3] == "c": | |
| chat_id = get_channel_id(int(linkps[4])) | |
| message_id = int(linkps[6]) | |
| elif len(linkps) == 6: | |
| if linkps[3] == "c": | |
| chat_id = get_channel_id(int(linkps[4])) | |
| message_id = int(linkps[5]) | |
| else: | |
| chat_id = linkps[3] | |
| message_id = int(linkps[5]) | |
| elif len(linkps) == 5: | |
| chat_id = linkps[3] | |
| if chat_id == "m": | |
| raise ValueError("Invalid ClientType used to parse this message link") | |
| message_id = int(linkps[4]) | |
| except (ValueError, TypeError): | |
| raise ValueError("Invalid post URL. Must end with a numeric ID.") | |
| if not chat_id or not message_id: | |
| raise ValueError("Please send a valid Telegram post URL.") | |
| return chat_id, message_id | |
| def get_file_name(message_id: int, chat_message) -> str: | |
| if chat_message.document: | |
| return chat_message.document.file_name or f"{message_id}" | |
| elif chat_message.video: | |
| return chat_message.video.file_name or f"{message_id}.mp4" | |
| elif chat_message.audio: | |
| return chat_message.audio.file_name or f"{message_id}.mp3" | |
| elif chat_message.voice: | |
| return f"{message_id}.ogg" | |
| elif chat_message.video_note: | |
| return f"{message_id}.mp4" | |
| elif chat_message.animation: | |
| return chat_message.animation.file_name or f"{message_id}.gif" | |
| elif chat_message.sticker: | |
| if chat_message.sticker.is_animated: | |
| return f"{message_id}.tgs" | |
| elif chat_message.sticker.is_video: | |
| return f"{message_id}.webm" | |
| else: | |
| return f"{message_id}.webp" | |
| elif chat_message.photo: | |
| return f"{message_id}.jpg" | |
| return f"{message_id}" | |
| # ============================================================ | |
| # Media helpers | |
| # ============================================================ | |
| def progressArgs(action: str, progress_message, start_time: float): | |
| return (action, progress_message, start_time) | |
| async def send_media(bot_client, message, media_path, media_type, | |
| caption, progress_message, start_time): | |
| chat_id = message.chat.id | |
| reply_to = message.id | |
| send_map = { | |
| "photo": bot_client.send_photo, | |
| "video": bot_client.send_video, | |
| "audio": bot_client.send_audio, | |
| "document": bot_client.send_document, | |
| } | |
| sender = send_map.get(media_type, bot_client.send_document) | |
| for attempt in range(2): | |
| try: | |
| await sender(chat_id, media_path, caption=caption or "", | |
| reply_to_message_id=reply_to) | |
| return | |
| except FloodWait as e: | |
| wait_s = int(getattr(e, "value", 0) or 0) | |
| if wait_s > 0 and attempt == 0: | |
| await asyncio.sleep(wait_s + 1) | |
| continue | |
| raise | |
| async def processMediaGroup(chat_message, bot_client, message) -> bool: | |
| chat_id = message.chat.id | |
| reply_to = message.id | |
| try: | |
| group_messages = await user.get_media_group( | |
| chat_message.chat.id, chat_message.id) | |
| except Exception as e: | |
| LOGGER(__name__).error(f"Failed to get media group: {e}") | |
| return False | |
| if not group_messages: | |
| return False | |
| downloaded_paths = [] | |
| media_list = [] | |
| for idx, gm in enumerate(group_messages): | |
| try: | |
| filename = get_file_name(gm.id, gm) | |
| download_path = get_download_path(reply_to, f"grp_{idx}_{filename}") | |
| path = await gm.download(file_name=download_path) | |
| if not path: | |
| continue | |
| downloaded_paths.append(path) | |
| cap = "" | |
| if idx == 0 and (gm.caption or gm.text): | |
| cap = Parser.unparse( | |
| gm.caption or gm.text or "", | |
| gm.caption_entities or gm.entities or [], | |
| is_html=False) | |
| if gm.photo: | |
| media_list.append(InputMediaPhoto(path, caption=cap)) | |
| elif gm.video: | |
| media_list.append(InputMediaVideo(path, caption=cap)) | |
| elif gm.audio: | |
| media_list.append(InputMediaAudio(path, caption=cap)) | |
| else: | |
| media_list.append(InputMediaDocument(path, caption=cap)) | |
| except Exception as e: | |
| LOGGER(__name__).error(f"Error downloading group item {gm.id}: {e}") | |
| if not media_list: | |
| for p in downloaded_paths: | |
| cleanup_download(p) | |
| return False | |
| try: | |
| await bot_client.send_media_group(chat_id, media_list, | |
| reply_to_message_id=reply_to) | |
| except Exception as e: | |
| LOGGER(__name__).warning(f"send_media_group failed ({e}), sending individually") | |
| for path in downloaded_paths: | |
| try: | |
| await bot_client.send_document(chat_id, path, | |
| reply_to_message_id=reply_to) | |
| except Exception as ie: | |
| LOGGER(__name__).error(f"Individual send failed: {ie}") | |
| for p in downloaded_paths: | |
| cleanup_download(p) | |
| return True | |
| # ============================================================ | |
| # Pyrogram clients | |
| # ============================================================ | |
| bot = Client( | |
| "media_bot", | |
| api_id=PyroConf.API_ID, | |
| api_hash=PyroConf.API_HASH, | |
| bot_token=PyroConf.BOT_TOKEN, | |
| workers=100, | |
| parse_mode=ParseMode.MARKDOWN, | |
| max_concurrent_transmissions=1, | |
| sleep_threshold=30, | |
| ) | |
| user = Client( | |
| "user_session", | |
| api_id=PyroConf.API_ID, | |
| api_hash=PyroConf.API_HASH, | |
| session_string=PyroConf.SESSION_STRING, | |
| workers=100, | |
| max_concurrent_transmissions=1, | |
| sleep_threshold=30, | |
| ) | |
| # ============================================================ | |
| # Global state | |
| # ============================================================ | |
| RUNNING_TASKS: set = set() | |
| download_semaphore = None | |
| def track_task(coro): | |
| task = asyncio.create_task(coro) | |
| RUNNING_TASKS.add(task) | |
| task.add_done_callback(RUNNING_TASKS.discard) | |
| return task | |
| # ============================================================ | |
| # Webhook response helpers | |
| # ============================================================ | |
| def _msg(chat_id: int, text: str, parse_mode: str = "Markdown", | |
| reply_markup: dict = None, disable_web_page_preview: bool = False) -> dict: | |
| payload = {"method": "sendMessage", "chat_id": chat_id, | |
| "text": text, "parse_mode": parse_mode} | |
| if reply_markup: | |
| payload["reply_markup"] = reply_markup | |
| if disable_web_page_preview: | |
| payload["disable_web_page_preview"] = True | |
| return payload | |
| def _update_channel_markup() -> dict: | |
| return {"inline_keyboard": [[ | |
| {"text": "Update Channel", "url": "https://t.me/itsSmartDev"} | |
| ]]} | |
| # ============================================================ | |
| # Instant command handlers (returned as webhook reply body) | |
| # ============================================================ | |
| def handle_start(chat_id: int) -> dict: | |
| return _msg(chat_id, | |
| "π **Welcome to Media Downloader Bot!**\n\n" | |
| "I can grab photos, videos, audio, and documents from any Telegram post.\n" | |
| "Just send me a link (paste it directly or use `/dl <link>`).\n\n" | |
| "βΉοΈ Use `/help` to view all commands.\n" | |
| "π Make sure the user client is part of the chat.\n\n" | |
| "Ready? Send me a Telegram post link!", | |
| reply_markup=_update_channel_markup(), disable_web_page_preview=True) | |
| def handle_help(chat_id: int) -> dict: | |
| return _msg(chat_id, | |
| "π‘ **Media Downloader Bot Help**\n\n" | |
| "β€ **Download Media**\n" | |
| " β `/dl <post_URL>` or paste a link directly.\n\n" | |
| "β€ **Batch Download**\n" | |
| " β `/bdl start_link end_link`\n" | |
| " π‘ Example: `/bdl https://t.me/mychannel/100 https://t.me/mychannel/120`\n\n" | |
| "β€ `/killall` β cancel pending downloads\n" | |
| "β€ `/logs` β get bot log file\n" | |
| "β€ `/cleanup` β remove temp files\n" | |
| "β€ `/stats` β show system stats\n\n" | |
| "**Example**: `/dl https://t.me/itsSmartDev/547`", | |
| reply_markup=_update_channel_markup(), disable_web_page_preview=True) | |
| def handle_stats(chat_id: int) -> dict: | |
| current_time = get_readable_time(int(time() - PyroConf.BOT_START_TIME)) | |
| total, used, free = shutil.disk_usage(".") | |
| sent = get_readable_file_size(psutil.net_io_counters().bytes_sent) | |
| recv = get_readable_file_size(psutil.net_io_counters().bytes_recv) | |
| cpu = psutil.cpu_percent(interval=0.2) | |
| mem = psutil.virtual_memory().percent | |
| disk = psutil.disk_usage("/").percent | |
| proc = psutil.Process(os.getpid()) | |
| return _msg(chat_id, | |
| "**β§ββ‘ββ¦ Bot is Up and Running successfully.**\n\n" | |
| f"**β Uptime:** `{current_time}`\n" | |
| f"**β Disk:** `{get_readable_file_size(total)}` total " | |
| f"`{get_readable_file_size(used)}` used " | |
| f"`{get_readable_file_size(free)}` free\n" | |
| f"**β Memory:** `{round(proc.memory_info()[0] / 1024**2)} MiB`\n" | |
| f"**β Net:** β`{sent}` β`{recv}`\n" | |
| f"**β CPU:** `{cpu}%` **RAM:** `{mem}%` **DISK:** `{disk}%`") | |
| def handle_killall(chat_id: int) -> dict: | |
| cancelled = sum(1 for t in list(RUNNING_TASKS) if not t.done() and t.cancel()) | |
| return _msg(chat_id, f"**Cancelled {cancelled} running task(s).**") | |
| def handle_cleanup(chat_id: int) -> dict: | |
| try: | |
| files_removed, bytes_freed = cleanup_downloads_root() | |
| if files_removed == 0: | |
| return _msg(chat_id, "π§Ή **Cleanup complete:** no local downloads found.") | |
| return _msg(chat_id, | |
| f"π§Ή **Cleanup complete:** removed `{files_removed}` file(s), " | |
| f"freed `{get_readable_file_size(bytes_freed)}`.") | |
| except Exception as e: | |
| LOGGER(__name__).error(f"Cleanup failed: {e}") | |
| return _msg(chat_id, "β **Cleanup failed.** Check logs for details.") | |
| # ============================================================ | |
| # Message proxy | |
| # ============================================================ | |
| class _MsgProxy: | |
| def __init__(self, cid: int, rid: int): | |
| self.id = rid | |
| self.chat = type("_Chat", (), {"id": cid})() | |
| async def reply(self, text, **kw): | |
| kw.pop("reply_markup", None) | |
| await bot.send_message(self.chat.id, text, | |
| reply_to_message_id=self.id) | |
| async def reply_document(self, document, caption="", **kw): | |
| await bot.send_document(self.chat.id, document, caption=caption, | |
| reply_to_message_id=self.id) | |
| # ============================================================ | |
| # Async download workers | |
| # ============================================================ | |
| async def handle_download(chat_id: int, reply_to_id: int, post_url: str): | |
| async with download_semaphore: | |
| if "?" in post_url: | |
| post_url = post_url.split("?", 1)[0] | |
| message = _MsgProxy(chat_id, reply_to_id) | |
| try: | |
| _cid, message_id = getChatMsgID(post_url) | |
| chat_message = await user.get_messages( | |
| chat_id=_cid, message_ids=message_id) | |
| LOGGER(__name__).info(f"Downloading from: {post_url}") | |
| if chat_message.document or chat_message.video or chat_message.audio: | |
| file_size = ( | |
| chat_message.document.file_size if chat_message.document | |
| else chat_message.video.file_size if chat_message.video | |
| else chat_message.audio.file_size) | |
| if not await fileSizeLimit(file_size, message, "download", | |
| user.me.is_premium): | |
| return | |
| parsed_caption = await get_parsed_msg( | |
| chat_message.caption or "", chat_message.caption_entities) | |
| parsed_text = await get_parsed_msg( | |
| chat_message.text or "", chat_message.entities) | |
| if chat_message.media_group_id: | |
| if not await processMediaGroup(chat_message, bot, message): | |
| await message.reply( | |
| "**Could not extract any valid media from the media group.**") | |
| return | |
| elif chat_message.media: | |
| start_time = time() | |
| progress_message = await bot.send_message( | |
| chat_id, "**π₯ Downloading...**", | |
| reply_to_message_id=reply_to_id) | |
| filename = get_file_name(message_id, chat_message) | |
| download_path = get_download_path(reply_to_id, filename) | |
| media_path = None | |
| for attempt in range(2): | |
| try: | |
| media_path = await chat_message.download( | |
| file_name=download_path, | |
| progress=Leaves.progress_for_pyrogram, | |
| progress_args=progressArgs( | |
| "π₯ Downloading", progress_message, start_time)) | |
| break | |
| except FloodWait as e: | |
| wait_s = int(getattr(e, "value", 0) or 0) | |
| if wait_s > 0 and attempt == 0: | |
| await asyncio.sleep(wait_s + 1) | |
| continue | |
| raise | |
| if not media_path or not os.path.exists(media_path): | |
| await progress_message.edit( | |
| "**β Download failed: File not saved properly**") | |
| return | |
| file_size = os.path.getsize(media_path) | |
| if file_size == 0: | |
| await progress_message.edit("**β Download failed: File is empty**") | |
| cleanup_download(media_path) | |
| return | |
| media_type = ( | |
| "photo" if chat_message.photo | |
| else "video" if chat_message.video | |
| else "audio" if chat_message.audio | |
| else "document") | |
| await send_media(bot, message, media_path, media_type, | |
| parsed_caption, progress_message, start_time) | |
| cleanup_download(media_path) | |
| await progress_message.delete() | |
| elif chat_message.text or chat_message.caption: | |
| await message.reply(parsed_text or parsed_caption) | |
| else: | |
| await message.reply("**No media or text found in the post URL.**") | |
| except FloodWait as e: | |
| wait_s = int(getattr(e, "value", 0) or 0) | |
| LOGGER(__name__).warning(f"FloodWait: {wait_s}s") | |
| if wait_s > 0: | |
| await asyncio.sleep(wait_s + 1) | |
| except (PeerIdInvalid, BadRequest, KeyError): | |
| await message.reply("**Make sure the user client is part of the chat.**") | |
| except Exception as e: | |
| await message.reply(f"**β {str(e)}**") | |
| LOGGER(__name__).error(traceback.format_exc()) | |
| async def handle_batch_download(chat_id: int, reply_to_id: int, | |
| start_link: str, end_link: str): | |
| message = _MsgProxy(chat_id, reply_to_id) | |
| try: | |
| start_chat, start_id = getChatMsgID(start_link) | |
| end_chat, end_id = getChatMsgID(end_link) | |
| except Exception as e: | |
| await message.reply(f"**β Error parsing links:\n{e}**") | |
| return | |
| if start_chat != end_chat: | |
| await message.reply("**β Both links must be from the same channel.**") | |
| return | |
| if start_id > end_id: | |
| await message.reply("**β Invalid range: start ID cannot exceed end ID.**") | |
| return | |
| try: | |
| await user.get_chat(start_chat) | |
| except Exception: | |
| pass | |
| prefix = start_link.rsplit("/", 1)[0] | |
| loading = await bot.send_message( | |
| chat_id, f"π₯ **Downloading posts {start_id}β{end_id}β¦**", | |
| reply_to_message_id=reply_to_id) | |
| downloaded = skipped = failed = 0 | |
| processed_media_groups: set = set() | |
| batch_tasks = [] | |
| for msg_id in range(start_id, end_id + 1): | |
| url = f"{prefix}/{msg_id}" | |
| try: | |
| chat_msg = await user.get_messages( | |
| chat_id=start_chat, message_ids=msg_id) | |
| if not chat_msg: | |
| skipped += 1 | |
| continue | |
| if chat_msg.media_group_id: | |
| if chat_msg.media_group_id in processed_media_groups: | |
| skipped += 1 | |
| continue | |
| processed_media_groups.add(chat_msg.media_group_id) | |
| if not (chat_msg.media_group_id or chat_msg.media | |
| or chat_msg.text or chat_msg.caption): | |
| skipped += 1 | |
| continue | |
| batch_tasks.append( | |
| track_task(handle_download(chat_id, reply_to_id, url))) | |
| if len(batch_tasks) >= PyroConf.BATCH_SIZE: | |
| results = await asyncio.gather(*batch_tasks, return_exceptions=True) | |
| for result in results: | |
| if isinstance(result, asyncio.CancelledError): | |
| await loading.delete() | |
| await message.reply( | |
| f"**β Batch canceled** after `{downloaded}` posts.") | |
| return | |
| elif isinstance(result, Exception): | |
| failed += 1 | |
| else: | |
| downloaded += 1 | |
| batch_tasks.clear() | |
| await asyncio.sleep(PyroConf.FLOOD_WAIT_DELAY) | |
| except Exception as e: | |
| failed += 1 | |
| LOGGER(__name__).error(f"Error at {url}: {e}") | |
| if batch_tasks: | |
| results = await asyncio.gather(*batch_tasks, return_exceptions=True) | |
| for r in results: | |
| if isinstance(r, Exception): | |
| failed += 1 | |
| else: | |
| downloaded += 1 | |
| await loading.delete() | |
| await message.reply( | |
| "**β Batch Process Complete!**\n" | |
| "βββββββββββββββββββ\n" | |
| f"π₯ **Downloaded** : `{downloaded}` post(s)\n" | |
| f"βοΈ **Skipped** : `{skipped}` (no content)\n" | |
| f"β **Failed** : `{failed}` error(s)") | |
| # ============================================================ | |
| # FastAPI lifespan | |
| # ============================================================ | |
| async def lifespan(app: FastAPI): | |
| global download_semaphore | |
| download_semaphore = asyncio.Semaphore(PyroConf.MAX_CONCURRENT_DOWNLOADS) | |
| LOGGER(__name__).info("Starting Pyrogram clientsβ¦") | |
| await user.start() | |
| await bot.start() | |
| LOGGER(__name__).info("Bot ready β waiting for webhook updates.") | |
| yield | |
| LOGGER(__name__).info("Shutting downβ¦") | |
| await bot.stop() | |
| await user.stop() | |
| app = FastAPI(lifespan=lifespan) | |
| # ============================================================ | |
| # Webhook endpoint | |
| # ============================================================ | |
| async def telegram_webhook(request: Request): | |
| update = await request.json() | |
| LOGGER(__name__).debug(f"Update: {update}") | |
| if "message" not in update: | |
| return JSONResponse({"status": "ok"}) | |
| msg = update["message"] | |
| chat_id = msg["chat"]["id"] | |
| text = msg.get("text", "").strip() | |
| msg_id = msg["message_id"] | |
| if text.startswith("/start"): | |
| return JSONResponse(handle_start(chat_id)) | |
| if text.startswith("/help"): | |
| return JSONResponse(handle_help(chat_id)) | |
| if text.startswith("/stats"): | |
| return JSONResponse(handle_stats(chat_id)) | |
| if text.startswith("/killall"): | |
| return JSONResponse(handle_killall(chat_id)) | |
| if text.startswith("/cleanup"): | |
| return JSONResponse(handle_cleanup(chat_id)) | |
| if text.startswith("/logs"): | |
| async def _send_logs(): | |
| if os.path.exists("logs.txt"): | |
| await bot.send_document(chat_id, "logs.txt", caption="**Logs**", | |
| reply_to_message_id=msg_id) | |
| else: | |
| await bot.send_message(chat_id, "**No log file found.**", | |
| reply_to_message_id=msg_id) | |
| track_task(_send_logs()) | |
| return JSONResponse({"status": "ok"}) | |
| if text.startswith("/dl"): | |
| parts = text.split(None, 1) | |
| if len(parts) < 2: | |
| return JSONResponse( | |
| _msg(chat_id, "**Provide a post URL after the /dl command.**")) | |
| track_task(handle_download(chat_id, msg_id, parts[1].strip())) | |
| return JSONResponse({"status": "ok"}) | |
| if text.startswith("/bdl"): | |
| parts = text.split() | |
| if (len(parts) != 3 | |
| or not all(p.startswith("https://t.me/") for p in parts[1:])): | |
| return JSONResponse(_msg(chat_id, | |
| "π **Batch Download**\n`/bdl start_link end_link`\n\n" | |
| "π‘ Example:\n" | |
| "`/bdl https://t.me/mychannel/100 https://t.me/mychannel/120`")) | |
| track_task(handle_batch_download(chat_id, msg_id, parts[1], parts[2])) | |
| return JSONResponse({"status": "ok"}) | |
| if text and not text.startswith("/"): | |
| track_task(handle_download(chat_id, msg_id, text)) | |
| return JSONResponse({"status": "ok"}) | |
| return JSONResponse({"status": "ok"}) | |