| import os |
| import threading |
| import time |
| import random |
| from datetime import datetime |
|
|
| from flask_srvr import run_flask |
| from telegram import * |
| from telegram.ext import * |
|
|
| import mdb |
|
|
| |
| |
| |
| def start_flask(): |
| t = threading.Thread(target=run_flask) |
| t.daemon = True |
| t.start() |
|
|
| start_flask() |
|
|
| |
| |
| |
| |
| BOT_TOKEN = os.getenv("tgtkn") |
| GROUP_ID = -1003799294466 |
| ADMIN_ID = 83939584939485 |
| MAX_STORAGE = 15360 * 1024 * 1024 |
|
|
| def storage_bar(used, total=MAX_STORAGE): |
| pct = min(used / total, 1.0) |
| filled = int(pct * 10) |
| return "[" + "β" * filled + "β" * (10 - filled) + "]" |
|
|
| def breadcrumb(path_list): |
| if not path_list: return "root" |
| return " / ".join(path_list) |
|
|
| def get_icon(filename): |
| ext = filename.split('.')[-1].lower() if '.' in filename else '' |
| if ext in ['jpg', 'jpeg', 'png', 'gif', 'webp']: return 'πΌ' |
| if ext in ['mp4', 'mkv', 'avi']: return 'π₯' |
| if ext in ['mp3', 'wav', 'ogg', 'm4a']: return 'π΅' |
| if ext in ['zip', 'rar', '7z']: return 'π' |
| if ext in ['pdf']: return 'π' |
| if ext in ['txt', 'doc', 'docx']: return 'π' |
| return 'π' |
|
|
| |
| |
| |
| user_state = {} |
| last_menu_msg = {} |
|
|
| |
| |
| |
| def default_data(): |
| return { |
| "files": [], |
| "index": {}, |
| "folders": { |
| "root": {"files": [], "folders": [], "parent": None} |
| } |
| } |
|
|
| def get_data(uid): |
| data = mdb.get(f"user_{uid}", "json") or default_data() |
| if "folders" not in data: |
| data["folders"] = default_data()["folders"] |
| if "root" not in data["folders"]: |
| data["folders"]["root"] = {"files": [], "folders": [], "parent": None} |
| return data |
|
|
| def save_data(uid, data): |
| mdb.set(f"user_{uid}", "json", data) |
|
|
| def is_admin(uid): |
| admins = mdb.get("admins", "json") or [] |
| return uid == ADMIN_ID or uid in admins |
|
|
| |
| |
| |
| def menu_text(uid): |
| data = get_data(uid) |
| used = sum(f.get("file_size", 0) for f in data["files"]) |
|
|
| return f"""βοΈ Cloud Storage Bot |
| |
| π Storage: |
| {storage_bar(used)} |
| {round(used/1024/1024, 2)} MB / 15360 MB |
| """ |
|
|
| def menu_kb(): |
| return InlineKeyboardMarkup([ |
| [InlineKeyboardButton("π€ Upload Here", callback_data="upload_root")], |
| [InlineKeyboardButton("π My Files", callback_data="open_root")], |
| [InlineKeyboardButton("π Search", callback_data="search")], |
| [InlineKeyboardButton("β New Folder", callback_data="newfolder_root")] |
| ]) |
|
|
| async def send_main_menu(context: ContextTypes.DEFAULT_TYPE, uid: int): |
| """Naya menu bhejega aur purana wala delete karega""" |
| if uid in last_menu_msg: |
| try: |
| await context.bot.delete_message(chat_id=uid, message_id=last_menu_msg[uid]) |
| except Exception: |
| pass |
|
|
| msg = await context.bot.send_message(chat_id=uid, text=menu_text(uid), reply_markup=menu_kb()) |
| last_menu_msg[uid] = msg.message_id |
|
|
| |
| |
| |
| def build_folder(uid, data, folder): |
| if folder not in data["folders"]: |
| folder = "root" |
| |
| f = data["folders"][folder] |
|
|
| |
| path = [] |
| cur = folder |
| while cur: |
| path.append(cur) |
| cur = data["folders"][cur]["parent"] |
| path.reverse() |
|
|
| text = f"π <b>{breadcrumb(path)}</b>\n\n" |
| kb = [] |
|
|
| |
| for sub in f["folders"]: |
| kb.append([InlineKeyboardButton(f"π {sub}", callback_data=f"open_{sub}")]) |
|
|
| |
| for fid in f["files"]: |
| if fid in data["index"]: |
| file = data["index"][fid] |
| kb.append([InlineKeyboardButton( |
| f"{get_icon(file['original_name'])} {file['original_name']}", |
| callback_data=f"file_{fid}" |
| )]) |
|
|
| |
| kb.append([InlineKeyboardButton("β Add Folder", callback_data=f"newfolder_{folder}")]) |
|
|
| if folder != "root": |
| kb.append([ |
| InlineKeyboardButton("βοΈ Rename", callback_data=f"renamefolder_{folder}"), |
| InlineKeyboardButton("π Delete", callback_data=f"delfolder_{folder}") |
| ]) |
|
|
| kb.append([InlineKeyboardButton("π€ Upload Here", callback_data=f"upload_{folder}")]) |
|
|
| parent = f["parent"] or "root" |
| if folder != "root": |
| kb.append([InlineKeyboardButton("π Back", callback_data=f"open_{parent}")]) |
| |
| kb.append([InlineKeyboardButton("π Home / Cancel", callback_data="cancel")]) |
|
|
| return text, InlineKeyboardMarkup(kb) |
|
|
| |
| |
| |
| async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): |
| uid = update.effective_user.id |
| user_state.pop(uid, None) |
| await send_main_menu(context, uid) |
|
|
| |
| |
| |
| async def buttons(update: Update, context: ContextTypes.DEFAULT_TYPE): |
| try: |
| q = update.callback_query |
| await q.answer() |
| uid = q.from_user.id |
| data = get_data(uid) |
|
|
| |
| if q.data == "cancel": |
| user_state.pop(uid, None) |
| await q.message.delete() |
| return await send_main_menu(context, uid) |
|
|
| |
| if q.data.startswith("open_"): |
| folder = q.data.split("_", 1)[1] |
| text, kb = build_folder(uid, data, folder) |
| return await q.message.edit_text(text, parse_mode="HTML", reply_markup=kb) |
|
|
| |
| if q.data == "search": |
| user_state[uid] = {"mode": "search"} |
| kb = [[InlineKeyboardButton("β Cancel", callback_data="cancel")]] |
| return await q.message.edit_text("π Kaunsi file search karni hai? Naam likh kar bhejo:", reply_markup=InlineKeyboardMarkup(kb)) |
|
|
| |
| if q.data.startswith("upload_"): |
| folder = q.data.split("_", 1)[1] |
| user_state[uid] = {"mode": "upload", "folder": folder} |
| kb = [[InlineKeyboardButton("β Cancel", callback_data="cancel")]] |
| return await q.message.edit_text(f"π€ Apni image, video ya document yaha send karo (Folder: <b>{folder}</b>)", parse_mode="HTML", reply_markup=InlineKeyboardMarkup(kb)) |
|
|
| |
| if q.data.startswith("file_"): |
| fid = q.data.split("_", 1)[1] |
| if fid not in data["index"]: |
| return await q.message.edit_text("β File not found.", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("π Back", callback_data="open_root")]])) |
| |
| f = data["index"][fid] |
| kb = [ |
| [InlineKeyboardButton("π Preview / Get File", callback_data=f"preview_{fid}")], |
| [InlineKeyboardButton("π Delete File", callback_data=f"delfile_{fid}")], |
| [InlineKeyboardButton("π Go Back", callback_data="open_root")] |
| ] |
| return await q.message.edit_text(f"π Name: {f['original_name']}\nπΎ Size: {round(f.get('file_size', 0)/1024/1024, 2)} MB", reply_markup=InlineKeyboardMarkup(kb)) |
|
|
| |
| if q.data.startswith("preview_"): |
| fid = q.data.split("_", 1)[1] |
| f = data["index"][fid] |
| ext = f["original_name"].lower().split('.')[-1] |
|
|
| if ext in ["jpg", "png", "jpeg", "webp", "gif"]: |
| await context.bot.send_photo(uid, f["file_id"]) |
| elif ext in ["mp4", "mkv", "avi"]: |
| await context.bot.send_video(uid, f["file_id"]) |
| elif ext in ["mp3", "wav", "ogg"]: |
| await context.bot.send_audio(uid, f["file_id"]) |
| elif ext in ["voice", "oga"]: |
| await context.bot.send_voice(uid, f["file_id"]) |
| else: |
| await context.bot.send_document(uid, f["file_id"]) |
| return |
|
|
| |
| if q.data.startswith("delfile_"): |
| fid = q.data.split("_", 1)[1] |
| if fid in data["index"]: |
| del data["index"][fid] |
| data["files"] = [f for f in data["files"] if f["file_name"] != fid] |
| for f_name, f_data in data["folders"].items(): |
| if fid in f_data["files"]: |
| f_data["files"].remove(fid) |
| save_data(uid, data) |
| await q.message.delete() |
| await context.bot.send_message(uid, "β
File deleted successfully.") |
| return await send_main_menu(context, uid) |
|
|
| |
| if q.data.startswith("newfolder_"): |
| parent = q.data.split("_", 1)[1] |
| user_state[uid] = {"mode": "new_folder", "parent": parent} |
| kb = [[InlineKeyboardButton("β Cancel", callback_data="cancel")]] |
| return await q.message.edit_text("π Naye folder ka naam type karke bhejo:", reply_markup=InlineKeyboardMarkup(kb)) |
|
|
| |
| if q.data.startswith("delfolder_"): |
| folder = q.data.split("_", 1)[1] |
| parent = data["folders"][folder]["parent"] |
| if parent and folder in data["folders"][parent]["folders"]: |
| data["folders"][parent]["folders"].remove(folder) |
| |
| del data["folders"][folder] |
| save_data(uid, data) |
| |
| text, kb = build_folder(uid, data, parent) |
| return await q.message.edit_text(text, parse_mode="HTML", reply_markup=kb) |
|
|
| |
| if q.data.startswith("renamefolder_"): |
| folder = q.data.split("_", 1)[1] |
| user_state[uid] = {"mode": "rename_folder", "folder": folder} |
| kb = [[InlineKeyboardButton("β Cancel", callback_data="cancel")]] |
| return await q.message.edit_text(f"βοΈ <b>{folder}</b> ke liye naya naam bhejo:", parse_mode="HTML", reply_markup=InlineKeyboardMarkup(kb)) |
|
|
| except Exception as e: |
| print("BTN ERROR:", e) |
|
|
| |
| |
| |
| async def handle_file(update: Update, context: ContextTypes.DEFAULT_TYPE): |
| try: |
| uid = update.effective_user.id |
| if uid not in user_state or user_state[uid].get("mode") != "upload": |
| await update.message.reply_text("β Upload karne ke liye pehle folder me jake 'Upload Here' pe click karo.") |
| return |
|
|
| state = user_state[uid] |
| folder = state["folder"] |
| data = get_data(uid) |
| msg = update.message |
|
|
| |
| if getattr(msg, 'document', None): |
| doc = msg.document |
| original_name = doc.file_name or f"document_{int(time.time())}" |
| elif getattr(msg, 'photo', None): |
| doc = msg.photo[-1] |
| original_name = f"photo_{int(time.time())}.jpg" |
| elif getattr(msg, 'video', None): |
| doc = msg.video |
| original_name = doc.file_name or f"video_{int(time.time())}.mp4" |
| elif getattr(msg, 'audio', None): |
| doc = msg.audio |
| original_name = doc.file_name or f"audio_{int(time.time())}.mp3" |
| elif getattr(msg, 'voice', None): |
| doc = msg.voice |
| original_name = f"voice_{int(time.time())}.ogg" |
| else: |
| return await update.message.reply_text("β Unsupported file format.") |
|
|
| used = sum(f.get("file_size", 0) for f in data["files"]) |
| if not is_admin(uid) and used + getattr(doc, 'file_size', 0) > MAX_STORAGE: |
| return await update.message.reply_text("β Storage limit reach ho gayi hai (15GB max).") |
|
|
| |
| await msg.copy(chat_id=GROUP_ID) |
| |
| file_id = doc.file_id |
| name = f"{uid}_{int(time.time())}_{random.randint(100,999)}" |
|
|
| entry = { |
| "file_name": name, |
| "original_name": original_name, |
| "file_id": file_id, |
| "file_size": getattr(doc, 'file_size', 0), |
| "upload_date": str(datetime.now()) |
| } |
|
|
| data["files"].append(entry) |
| data["index"][name] = entry |
| data["folders"][folder]["files"].append(name) |
|
|
| save_data(uid, data) |
| user_state.pop(uid, None) |
|
|
| |
| try: |
| await update.message.reply_text(f"β
<b>{original_name}</b> successfully upload ho gayi!", parse_mode="HTML") |
| except Exception as e: |
| print(f"MSG ERROR: {e}") |
| await update.message.reply_text("β
File successfully upload ho gayi!") |
|
|
| time.sleep(1) |
| await send_main_menu(context, uid) |
|
|
| except Exception as e: |
| print("UPLOAD ERROR:", e) |
|
|
| |
| |
| |
| async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE): |
| try: |
| uid = update.effective_user.id |
| if uid not in user_state: |
| return |
|
|
| state = user_state[uid] |
| data = get_data(uid) |
| text_input = update.message.text |
|
|
| |
| if state["mode"] == "search": |
| query = text_input.lower() |
| results = [] |
| for fid, f in data["index"].items(): |
| if query in f["original_name"].lower(): |
| results.append(f) |
|
|
| if not results: |
| await update.message.reply_text("β Koi aisi file nahi mili.") |
| else: |
| kb = [] |
| for r in results[:10]: |
| kb.append([InlineKeyboardButton(f"{get_icon(r['original_name'])} {r['original_name']}", callback_data=f"file_{r['file_name']}")]) |
| kb.append([InlineKeyboardButton("β Clear Search", callback_data="cancel")]) |
| await update.message.reply_text(f"π Top results for '<b>{text_input}</b>':", parse_mode="HTML", reply_markup=InlineKeyboardMarkup(kb)) |
| |
| user_state.pop(uid, None) |
| return |
|
|
| |
| if state["mode"] == "new_folder": |
| name = text_input |
| parent = state["parent"] |
|
|
| if name in data["folders"]: |
| await update.message.reply_text("β Is naam ka folder already hai.") |
| else: |
| data["folders"][name] = {"files": [], "folders": [], "parent": parent} |
| data["folders"][parent]["folders"].append(name) |
| save_data(uid, data) |
| await update.message.reply_text(f"β
Folder <b>{name}</b> ban gaya!", parse_mode="HTML") |
|
|
| user_state.pop(uid, None) |
| return await send_main_menu(context, uid) |
|
|
| |
| if state["mode"] == "rename_folder": |
| old_name = state["folder"] |
| new_name = text_input |
| parent = data["folders"][old_name]["parent"] |
|
|
| if new_name in data["folders"]: |
| await update.message.reply_text("β Ye naam already exists.") |
| else: |
| data["folders"][new_name] = data["folders"].pop(old_name) |
| if parent: |
| idx = data["folders"][parent]["folders"].index(old_name) |
| data["folders"][parent]["folders"][idx] = new_name |
| for sub in data["folders"][new_name]["folders"]: |
| data["folders"][sub]["parent"] = new_name |
|
|
| save_data(uid, data) |
| await update.message.reply_text(f"β
Folder ka naam <b>{new_name}</b> ho gaya.", parse_mode="HTML") |
|
|
| user_state.pop(uid, None) |
| return await send_main_menu(context, uid) |
|
|
| except Exception as e: |
| print("TEXT ERROR:", e) |
|
|
| |
| |
| |
| async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None: |
| print(f"β οΈ EXCEPTION AAYA HAI: {context.error}") |
|
|
| |
| |
| |
| if not BOT_TOKEN: |
| print("β ERROR: Bot token nahi mila! Apna token set karo.") |
| exit(1) |
|
|
| app = ApplicationBuilder().token(BOT_TOKEN).build() |
|
|
| app.add_handler(CommandHandler("start", start)) |
| app.add_handler(CallbackQueryHandler(buttons)) |
| |
| app.add_handler(MessageHandler(filters.ALL & ~filters.TEXT & ~filters.COMMAND, handle_file)) |
| app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text)) |
| app.add_error_handler(error_handler) |
|
|
| print("π₯ FINAL BOT RUNNING - FULLY UPDATED & FIXED") |
| app.run_polling() |