import os
import threading
import time
import random
from datetime import datetime
from flask_srvr import run_flask
from telegram import *
from telegram.ext import *
import mdb
# =========================
# š FLASK
# =========================
def start_flask():
t = threading.Thread(target=run_flask)
t.daemon = True
t.start()
start_flask()
# =========================
# āļø CONFIG & UTILS
# =========================
# Agar env variable nahi milta to yaha apna token daal sakte ho testing ke liye
BOT_TOKEN = os.getenv("tgtkn")
GROUP_ID = -1003799294466
ADMIN_ID = 83939584939485
MAX_STORAGE = 15360 * 1024 * 1024 # 15 GB max storage
def storage_bar(used, total=MAX_STORAGE):
pct = min(used / total, 1.0)
filled = int(pct * 10)
return "[" + "ā" * filled + "ā" * (10 - filled) + "]"
def breadcrumb(path_list):
if not path_list: return "root"
return " / ".join(path_list)
def get_icon(filename):
ext = filename.split('.')[-1].lower() if '.' in filename else ''
if ext in ['jpg', 'jpeg', 'png', 'gif', 'webp']: return 'š¼'
if ext in ['mp4', 'mkv', 'avi']: return 'š„'
if ext in ['mp3', 'wav', 'ogg', 'm4a']: return 'šµ'
if ext in ['zip', 'rar', '7z']: return 'š'
if ext in ['pdf']: return 'š'
if ext in ['txt', 'doc', 'docx']: return 'š'
return 'š'
# =========================
# š§ STATE
# =========================
user_state = {}
last_menu_msg = {} # Pichle menu message ko delete karne ke liye tracker
# =========================
# š DB
# =========================
def default_data():
return {
"files": [],
"index": {},
"folders": {
"root": {"files": [], "folders": [], "parent": None}
}
}
def get_data(uid):
data = mdb.get(f"user_{uid}", "json") or default_data()
if "folders" not in data:
data["folders"] = default_data()["folders"]
if "root" not in data["folders"]:
data["folders"]["root"] = {"files": [], "folders": [], "parent": None}
return data
def save_data(uid, data):
mdb.set(f"user_{uid}", "json", data)
def is_admin(uid):
admins = mdb.get("admins", "json") or []
return uid == ADMIN_ID or uid in admins
# =========================
# š MENU HELPERS
# =========================
def menu_text(uid):
data = get_data(uid)
used = sum(f.get("file_size", 0) for f in data["files"])
return f"""āļø Cloud Storage Bot
š Storage:
{storage_bar(used)}
{round(used/1024/1024, 2)} MB / 15360 MB
"""
def menu_kb():
return InlineKeyboardMarkup([
[InlineKeyboardButton("š¤ Upload Here", callback_data="upload_root")],
[InlineKeyboardButton("š My Files", callback_data="open_root")],
[InlineKeyboardButton("š Search", callback_data="search")],
[InlineKeyboardButton("ā New Folder", callback_data="newfolder_root")]
])
async def send_main_menu(context: ContextTypes.DEFAULT_TYPE, uid: int):
"""Naya menu bhejega aur purana wala delete karega"""
if uid in last_menu_msg:
try:
await context.bot.delete_message(chat_id=uid, message_id=last_menu_msg[uid])
except Exception:
pass # Message pehle hi delete ho chuka hoga ya permission nahi hogi
msg = await context.bot.send_message(chat_id=uid, text=menu_text(uid), reply_markup=menu_kb())
last_menu_msg[uid] = msg.message_id
# =========================
# š FOLDER VIEW
# =========================
def build_folder(uid, data, folder):
if folder not in data["folders"]:
folder = "root" # Fallback if folder was deleted
f = data["folders"][folder]
# Breadcrumb path
path = []
cur = folder
while cur:
path.append(cur)
cur = data["folders"][cur]["parent"]
path.reverse()
text = f"š {breadcrumb(path)}\n\n"
kb = []
# Folders list
for sub in f["folders"]:
kb.append([InlineKeyboardButton(f"š {sub}", callback_data=f"open_{sub}")])
# Files list
for fid in f["files"]:
if fid in data["index"]:
file = data["index"][fid]
kb.append([InlineKeyboardButton(
f"{get_icon(file['original_name'])} {file['original_name']}",
callback_data=f"file_{fid}"
)])
# Actions
kb.append([InlineKeyboardButton("ā Add Folder", callback_data=f"newfolder_{folder}")])
if folder != "root":
kb.append([
InlineKeyboardButton("āļø Rename", callback_data=f"renamefolder_{folder}"),
InlineKeyboardButton("š Delete", callback_data=f"delfolder_{folder}")
])
kb.append([InlineKeyboardButton("š¤ Upload Here", callback_data=f"upload_{folder}")])
parent = f["parent"] or "root"
if folder != "root":
kb.append([InlineKeyboardButton("š Back", callback_data=f"open_{parent}")])
kb.append([InlineKeyboardButton("š Home / Cancel", callback_data="cancel")])
return text, InlineKeyboardMarkup(kb)
# =========================
# š START
# =========================
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
uid = update.effective_user.id
user_state.pop(uid, None)
await send_main_menu(context, uid)
# =========================
# š BUTTONS
# =========================
async def buttons(update: Update, context: ContextTypes.DEFAULT_TYPE):
try:
q = update.callback_query
await q.answer()
uid = q.from_user.id
data = get_data(uid)
# Cancel & Go Home
if q.data == "cancel":
user_state.pop(uid, None)
await q.message.delete()
return await send_main_menu(context, uid)
# Open folder
if q.data.startswith("open_"):
folder = q.data.split("_", 1)[1]
text, kb = build_folder(uid, data, folder)
return await q.message.edit_text(text, parse_mode="HTML", reply_markup=kb)
# Search Setup
if q.data == "search":
user_state[uid] = {"mode": "search"}
kb = [[InlineKeyboardButton("ā Cancel", callback_data="cancel")]]
return await q.message.edit_text("š Kaunsi file search karni hai? Naam likh kar bhejo:", reply_markup=InlineKeyboardMarkup(kb))
# Upload Setup
if q.data.startswith("upload_"):
folder = q.data.split("_", 1)[1]
user_state[uid] = {"mode": "upload", "folder": folder}
kb = [[InlineKeyboardButton("ā Cancel", callback_data="cancel")]]
return await q.message.edit_text(f"š¤ Apni image, video ya document yaha send karo (Folder: {folder})", parse_mode="HTML", reply_markup=InlineKeyboardMarkup(kb))
# View File
if q.data.startswith("file_"):
fid = q.data.split("_", 1)[1]
if fid not in data["index"]:
return await q.message.edit_text("ā File not found.", reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton("š Back", callback_data="open_root")]]))
f = data["index"][fid]
kb = [
[InlineKeyboardButton("š Preview / Get File", callback_data=f"preview_{fid}")],
[InlineKeyboardButton("š Delete File", callback_data=f"delfile_{fid}")],
[InlineKeyboardButton("š Go Back", callback_data="open_root")]
]
return await q.message.edit_text(f"š Name: {f['original_name']}\nš¾ Size: {round(f.get('file_size', 0)/1024/1024, 2)} MB", reply_markup=InlineKeyboardMarkup(kb))
# Preview File
if q.data.startswith("preview_"):
fid = q.data.split("_", 1)[1]
f = data["index"][fid]
ext = f["original_name"].lower().split('.')[-1]
if ext in ["jpg", "png", "jpeg", "webp", "gif"]:
await context.bot.send_photo(uid, f["file_id"])
elif ext in ["mp4", "mkv", "avi"]:
await context.bot.send_video(uid, f["file_id"])
elif ext in ["mp3", "wav", "ogg"]:
await context.bot.send_audio(uid, f["file_id"])
elif ext in ["voice", "oga"]:
await context.bot.send_voice(uid, f["file_id"])
else:
await context.bot.send_document(uid, f["file_id"])
return
# Delete File
if q.data.startswith("delfile_"):
fid = q.data.split("_", 1)[1]
if fid in data["index"]:
del data["index"][fid]
data["files"] = [f for f in data["files"] if f["file_name"] != fid]
for f_name, f_data in data["folders"].items():
if fid in f_data["files"]:
f_data["files"].remove(fid)
save_data(uid, data)
await q.message.delete()
await context.bot.send_message(uid, "ā
File deleted successfully.")
return await send_main_menu(context, uid)
# New Folder Setup
if q.data.startswith("newfolder_"):
parent = q.data.split("_", 1)[1]
user_state[uid] = {"mode": "new_folder", "parent": parent}
kb = [[InlineKeyboardButton("ā Cancel", callback_data="cancel")]]
return await q.message.edit_text("š Naye folder ka naam type karke bhejo:", reply_markup=InlineKeyboardMarkup(kb))
# Delete Folder
if q.data.startswith("delfolder_"):
folder = q.data.split("_", 1)[1]
parent = data["folders"][folder]["parent"]
if parent and folder in data["folders"][parent]["folders"]:
data["folders"][parent]["folders"].remove(folder)
del data["folders"][folder]
save_data(uid, data)
text, kb = build_folder(uid, data, parent)
return await q.message.edit_text(text, parse_mode="HTML", reply_markup=kb)
# Rename Folder Setup
if q.data.startswith("renamefolder_"):
folder = q.data.split("_", 1)[1]
user_state[uid] = {"mode": "rename_folder", "folder": folder}
kb = [[InlineKeyboardButton("ā Cancel", callback_data="cancel")]]
return await q.message.edit_text(f"āļø {folder} ke liye naya naam bhejo:", parse_mode="HTML", reply_markup=InlineKeyboardMarkup(kb))
except Exception as e:
print("BTN ERROR:", e)
# =========================
# š¤ FILE HANDLING (All types)
# =========================
async def handle_file(update: Update, context: ContextTypes.DEFAULT_TYPE):
try:
uid = update.effective_user.id
if uid not in user_state or user_state[uid].get("mode") != "upload":
await update.message.reply_text("ā Upload karne ke liye pehle folder me jake 'Upload Here' pe click karo.")
return
state = user_state[uid]
folder = state["folder"]
data = get_data(uid)
msg = update.message
# Safely extract file properties
if getattr(msg, 'document', None):
doc = msg.document
original_name = doc.file_name or f"document_{int(time.time())}"
elif getattr(msg, 'photo', None):
doc = msg.photo[-1] # Highest resolution
original_name = f"photo_{int(time.time())}.jpg"
elif getattr(msg, 'video', None):
doc = msg.video
original_name = doc.file_name or f"video_{int(time.time())}.mp4"
elif getattr(msg, 'audio', None):
doc = msg.audio
original_name = doc.file_name or f"audio_{int(time.time())}.mp3"
elif getattr(msg, 'voice', None):
doc = msg.voice
original_name = f"voice_{int(time.time())}.ogg"
else:
return await update.message.reply_text("ā Unsupported file format.")
used = sum(f.get("file_size", 0) for f in data["files"])
if not is_admin(uid) and used + getattr(doc, 'file_size', 0) > MAX_STORAGE:
return await update.message.reply_text("ā Storage limit reach ho gayi hai (15GB max).")
# Copy original message to Group to persist file
await msg.copy(chat_id=GROUP_ID)
file_id = doc.file_id
name = f"{uid}_{int(time.time())}_{random.randint(100,999)}"
entry = {
"file_name": name,
"original_name": original_name,
"file_id": file_id,
"file_size": getattr(doc, 'file_size', 0),
"upload_date": str(datetime.now())
}
data["files"].append(entry)
data["index"][name] = entry
data["folders"][folder]["files"].append(name)
save_data(uid, data)
user_state.pop(uid, None) # Safe pop
# ā
HTML use kar rahe hain taaki _ (underscore) ki wajah se markdown error na aaye
try:
await update.message.reply_text(f"ā
{original_name} successfully upload ho gayi!", parse_mode="HTML")
except Exception as e:
print(f"MSG ERROR: {e}")
await update.message.reply_text("ā
File successfully upload ho gayi!")
time.sleep(1) # Chhota pause better user experience ke liye
await send_main_menu(context, uid)
except Exception as e:
print("UPLOAD ERROR:", e)
# =========================
# š TEXT HANDLING
# =========================
async def handle_text(update: Update, context: ContextTypes.DEFAULT_TYPE):
try:
uid = update.effective_user.id
if uid not in user_state:
return
state = user_state[uid]
data = get_data(uid)
text_input = update.message.text
# š Search Logic
if state["mode"] == "search":
query = text_input.lower()
results = []
for fid, f in data["index"].items():
if query in f["original_name"].lower():
results.append(f)
if not results:
await update.message.reply_text("ā Koi aisi file nahi mili.")
else:
kb = []
for r in results[:10]: # Top 10 results dikhayega
kb.append([InlineKeyboardButton(f"{get_icon(r['original_name'])} {r['original_name']}", callback_data=f"file_{r['file_name']}")])
kb.append([InlineKeyboardButton("ā Clear Search", callback_data="cancel")])
await update.message.reply_text(f"š Top results for '{text_input}':", parse_mode="HTML", reply_markup=InlineKeyboardMarkup(kb))
user_state.pop(uid, None)
return
# š Create Folder Logic
if state["mode"] == "new_folder":
name = text_input
parent = state["parent"]
if name in data["folders"]:
await update.message.reply_text("ā Is naam ka folder already hai.")
else:
data["folders"][name] = {"files": [], "folders": [], "parent": parent}
data["folders"][parent]["folders"].append(name)
save_data(uid, data)
await update.message.reply_text(f"ā
Folder {name} ban gaya!", parse_mode="HTML")
user_state.pop(uid, None)
return await send_main_menu(context, uid)
# āļø Rename Folder Logic
if state["mode"] == "rename_folder":
old_name = state["folder"]
new_name = text_input
parent = data["folders"][old_name]["parent"]
if new_name in data["folders"]:
await update.message.reply_text("ā Ye naam already exists.")
else:
data["folders"][new_name] = data["folders"].pop(old_name)
if parent:
idx = data["folders"][parent]["folders"].index(old_name)
data["folders"][parent]["folders"][idx] = new_name
for sub in data["folders"][new_name]["folders"]:
data["folders"][sub]["parent"] = new_name
save_data(uid, data)
await update.message.reply_text(f"ā
Folder ka naam {new_name} ho gaya.", parse_mode="HTML")
user_state.pop(uid, None)
return await send_main_menu(context, uid)
except Exception as e:
print("TEXT ERROR:", e)
# =========================
# ā ļø ERROR HANDLER
# =========================
async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE) -> None:
print(f"ā ļø EXCEPTION AAYA HAI: {context.error}")
# =========================
# š RUN
# =========================
if not BOT_TOKEN:
print("ā ERROR: Bot token nahi mila! Apna token set karo.")
exit(1)
app = ApplicationBuilder().token(BOT_TOKEN).build()
app.add_handler(CommandHandler("start", start))
app.add_handler(CallbackQueryHandler(buttons))
# ā
Naya Filter jo Photos, Videos, Documents aur Audio sab catch karega bina kisi issue ke
app.add_handler(MessageHandler(filters.ALL & ~filters.TEXT & ~filters.COMMAND, handle_file))
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text))
app.add_error_handler(error_handler)
print("š„ FINAL BOT RUNNING - FULLY UPDATED & FIXED")
app.run_polling()