TGDRIVE / utils /bot_mode.py
dragxd's picture
Add MAIN_BOT_SESSION config to support mainbot.session file
cdb5090
import asyncio
from pyrogram import Client, filters
from pyrogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton
import config
from utils.logger import Logger
from pathlib import Path
logger = Logger(__name__)
START_CMD = """πŸš€ **Welcome To TG Drive's Bot Mode**
You can use this bot to upload files to your TG Drive website directly instead of doing it from website.
πŸ—„ **Commands:**
/set_folder - Set folder for file uploads
/current_folder - Check current folder
πŸ“€ **How To Upload Files:** Send a file to this bot and it will be uploaded to your TG Drive website. You can also set a folder for file uploads using /set_folder command.
Read more about [TG Drive's Bot Mode](https://github.com/TechShreyash/TGDrive#tg-drives-bot-mode)
"""
SET_FOLDER_PATH_CACHE = {} # Cache to store folder path for each folder id
DRIVE_DATA = None
BOT_MODE = None
session_cache_path = Path(f"./cache")
session_cache_path.parent.mkdir(parents=True, exist_ok=True)
main_bot = Client(
name=config.MAIN_BOT_SESSION.replace(".session", ""), # Remove .session extension for name
api_id=config.API_ID,
api_hash=config.API_HASH,
bot_token=config.MAIN_BOT_TOKEN,
sleep_threshold=config.SLEEP_THRESHOLD,
workdir=session_cache_path,
)
@main_bot.on_message(
filters.command(["start", "help"])
& filters.private
& filters.user(config.TELEGRAM_ADMIN_IDS),
)
async def start_handler(client: Client, message: Message):
await message.reply_text(START_CMD)
@main_bot.on_message(
filters.command("set_folder")
& filters.private
& filters.user(config.TELEGRAM_ADMIN_IDS),
)
async def set_folder_handler(client: Client, message: Message):
global SET_FOLDER_PATH_CACHE, DRIVE_DATA
while True:
try:
folder_name = await message.ask(
"Send the folder name where you want to upload files\n\n/cancel to cancel",
timeout=60,
filters=filters.text,
)
except asyncio.TimeoutError:
await message.reply_text("Timeout\n\nUse /set_folder to set folder again")
return
if folder_name.text.lower() == "/cancel":
await message.reply_text("Cancelled")
return
folder_name = folder_name.text.strip()
search_result = DRIVE_DATA.search_file_folder(folder_name)
# Get folders from search result
folders = {}
for item in search_result.values():
if item.type == "folder":
folders[item.id] = item
if len(folders) == 0:
await message.reply_text(f"No Folder found with name {folder_name}")
else:
break
buttons = []
folder_cache = {}
folder_cache_id = len(SET_FOLDER_PATH_CACHE) + 1
for folder in search_result.values():
path = folder.path.strip("/")
folder_path = "/" + ("/" + path + "/" + folder.id).strip("/")
folder_cache[folder.id] = (folder_path, folder.name)
buttons.append(
[
InlineKeyboardButton(
folder.name,
callback_data=f"set_folder_{folder_cache_id}_{folder.id}",
)
]
)
SET_FOLDER_PATH_CACHE[folder_cache_id] = folder_cache
await message.reply_text(
"Select the folder where you want to upload files",
reply_markup=InlineKeyboardMarkup(buttons),
)
@main_bot.on_callback_query(
filters.user(config.TELEGRAM_ADMIN_IDS) & filters.regex(r"set_folder_")
)
async def set_folder_callback(client: Client, callback_query: Message):
global SET_FOLDER_PATH_CACHE, BOT_MODE
folder_cache_id, folder_id = callback_query.data.split("_")[2:]
folder_path_cache = SET_FOLDER_PATH_CACHE.get(int(folder_cache_id))
if folder_path_cache is None:
await callback_query.answer("Request Expired, Send /set_folder again")
await callback_query.message.delete()
return
folder_path, name = folder_path_cache.get(folder_id)
del SET_FOLDER_PATH_CACHE[int(folder_cache_id)]
BOT_MODE.set_folder(folder_path, name)
await callback_query.answer(f"Folder Set Successfully To : {name}")
await callback_query.message.edit(
f"Folder Set Successfully To : {name}\n\nNow you can send / forward files to me and it will be uploaded to this folder."
)
@main_bot.on_message(
filters.command("current_folder")
& filters.private
& filters.user(config.TELEGRAM_ADMIN_IDS),
)
async def current_folder_handler(client: Client, message: Message):
global BOT_MODE
await message.reply_text(f"Current Folder: {BOT_MODE.current_folder_name}")
# Handling when any file is sent to the bot
@main_bot.on_message(
filters.private
& filters.user(config.TELEGRAM_ADMIN_IDS)
& (
filters.document
| filters.video
| filters.audio
| filters.photo
| filters.sticker
)
)
async def file_handler(client: Client, message: Message):
global BOT_MODE, DRIVE_DATA
copied_message = await message.copy(config.STORAGE_CHANNEL)
file = (
copied_message.document
or copied_message.video
or copied_message.audio
or copied_message.photo
or copied_message.sticker
)
DRIVE_DATA.new_file(
BOT_MODE.current_folder,
file.file_name,
copied_message.id,
file.file_size,
)
await message.reply_text(
f"""βœ… File Uploaded Successfully To Your TG Drive Website
**File Name:** {file.file_name}
**Folder:** {BOT_MODE.current_folder_name}
"""
)
async def start_bot_mode(d, b):
global DRIVE_DATA, BOT_MODE
DRIVE_DATA = d
BOT_MODE = b
logger.info("Starting Main Bot")
try:
await main_bot.start()
await main_bot.send_message(
config.STORAGE_CHANNEL, "Main Bot Started -> TG Drive's Bot Mode Enabled"
)
logger.info("Main Bot Started")
logger.info("TG Drive's Bot Mode Enabled")
except Exception as e:
error_msg = str(e) if str(e) else repr(e)
logger.error(f"Failed to start Main Bot: {error_msg}")
logger.warning("Bot Mode will not be available - Main Bot connection failed")
# Don't raise - allow the app to continue without bot mode