File size: 6,337 Bytes
e1a2a92 cdb5090 e1a2a92 3738e83 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 | import asyncio
from pyrogram import Client, filters
from pyrogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton
import config
from utils.logger import Logger
from pathlib import Path
logger = Logger(__name__)
START_CMD = """π **Welcome To TG Drive's Bot Mode**
You can use this bot to upload files to your TG Drive website directly instead of doing it from website.
π **Commands:**
/set_folder - Set folder for file uploads
/current_folder - Check current folder
π€ **How To Upload Files:** Send a file to this bot and it will be uploaded to your TG Drive website. You can also set a folder for file uploads using /set_folder command.
Read more about [TG Drive's Bot Mode](https://github.com/TechShreyash/TGDrive#tg-drives-bot-mode)
"""
SET_FOLDER_PATH_CACHE = {} # Cache to store folder path for each folder id
DRIVE_DATA = None
BOT_MODE = None
session_cache_path = Path(f"./cache")
session_cache_path.parent.mkdir(parents=True, exist_ok=True)
main_bot = Client(
name=config.MAIN_BOT_SESSION.replace(".session", ""), # Remove .session extension for name
api_id=config.API_ID,
api_hash=config.API_HASH,
bot_token=config.MAIN_BOT_TOKEN,
sleep_threshold=config.SLEEP_THRESHOLD,
workdir=session_cache_path,
)
@main_bot.on_message(
filters.command(["start", "help"])
& filters.private
& filters.user(config.TELEGRAM_ADMIN_IDS),
)
async def start_handler(client: Client, message: Message):
await message.reply_text(START_CMD)
@main_bot.on_message(
filters.command("set_folder")
& filters.private
& filters.user(config.TELEGRAM_ADMIN_IDS),
)
async def set_folder_handler(client: Client, message: Message):
global SET_FOLDER_PATH_CACHE, DRIVE_DATA
while True:
try:
folder_name = await message.ask(
"Send the folder name where you want to upload files\n\n/cancel to cancel",
timeout=60,
filters=filters.text,
)
except asyncio.TimeoutError:
await message.reply_text("Timeout\n\nUse /set_folder to set folder again")
return
if folder_name.text.lower() == "/cancel":
await message.reply_text("Cancelled")
return
folder_name = folder_name.text.strip()
search_result = DRIVE_DATA.search_file_folder(folder_name)
# Get folders from search result
folders = {}
for item in search_result.values():
if item.type == "folder":
folders[item.id] = item
if len(folders) == 0:
await message.reply_text(f"No Folder found with name {folder_name}")
else:
break
buttons = []
folder_cache = {}
folder_cache_id = len(SET_FOLDER_PATH_CACHE) + 1
for folder in search_result.values():
path = folder.path.strip("/")
folder_path = "/" + ("/" + path + "/" + folder.id).strip("/")
folder_cache[folder.id] = (folder_path, folder.name)
buttons.append(
[
InlineKeyboardButton(
folder.name,
callback_data=f"set_folder_{folder_cache_id}_{folder.id}",
)
]
)
SET_FOLDER_PATH_CACHE[folder_cache_id] = folder_cache
await message.reply_text(
"Select the folder where you want to upload files",
reply_markup=InlineKeyboardMarkup(buttons),
)
@main_bot.on_callback_query(
filters.user(config.TELEGRAM_ADMIN_IDS) & filters.regex(r"set_folder_")
)
async def set_folder_callback(client: Client, callback_query: Message):
global SET_FOLDER_PATH_CACHE, BOT_MODE
folder_cache_id, folder_id = callback_query.data.split("_")[2:]
folder_path_cache = SET_FOLDER_PATH_CACHE.get(int(folder_cache_id))
if folder_path_cache is None:
await callback_query.answer("Request Expired, Send /set_folder again")
await callback_query.message.delete()
return
folder_path, name = folder_path_cache.get(folder_id)
del SET_FOLDER_PATH_CACHE[int(folder_cache_id)]
BOT_MODE.set_folder(folder_path, name)
await callback_query.answer(f"Folder Set Successfully To : {name}")
await callback_query.message.edit(
f"Folder Set Successfully To : {name}\n\nNow you can send / forward files to me and it will be uploaded to this folder."
)
@main_bot.on_message(
filters.command("current_folder")
& filters.private
& filters.user(config.TELEGRAM_ADMIN_IDS),
)
async def current_folder_handler(client: Client, message: Message):
global BOT_MODE
await message.reply_text(f"Current Folder: {BOT_MODE.current_folder_name}")
# Handling when any file is sent to the bot
@main_bot.on_message(
filters.private
& filters.user(config.TELEGRAM_ADMIN_IDS)
& (
filters.document
| filters.video
| filters.audio
| filters.photo
| filters.sticker
)
)
async def file_handler(client: Client, message: Message):
global BOT_MODE, DRIVE_DATA
copied_message = await message.copy(config.STORAGE_CHANNEL)
file = (
copied_message.document
or copied_message.video
or copied_message.audio
or copied_message.photo
or copied_message.sticker
)
DRIVE_DATA.new_file(
BOT_MODE.current_folder,
file.file_name,
copied_message.id,
file.file_size,
)
await message.reply_text(
f"""β
File Uploaded Successfully To Your TG Drive Website
**File Name:** {file.file_name}
**Folder:** {BOT_MODE.current_folder_name}
"""
)
async def start_bot_mode(d, b):
global DRIVE_DATA, BOT_MODE
DRIVE_DATA = d
BOT_MODE = b
logger.info("Starting Main Bot")
try:
await main_bot.start()
await main_bot.send_message(
config.STORAGE_CHANNEL, "Main Bot Started -> TG Drive's Bot Mode Enabled"
)
logger.info("Main Bot Started")
logger.info("TG Drive's Bot Mode Enabled")
except Exception as e:
error_msg = str(e) if str(e) else repr(e)
logger.error(f"Failed to start Main Bot: {error_msg}")
logger.warning("Bot Mode will not be available - Main Bot connection failed")
# Don't raise - allow the app to continue without bot mode
|