|
|
| BOT_TOKEN = "6740023413:AAHZ5iEiXjkY7IGwljg37llFbekKLbIVkkw" |
|
|
| |
| GROUP_ID = -1003799294466 |
| import json |
| import time |
| from datetime import datetime |
| from telegram import ( |
| Update, |
| InlineKeyboardButton, |
| InlineKeyboardMarkup |
| ) |
| from telegram.ext import ( |
| ApplicationBuilder, |
| MessageHandler, |
| CommandHandler, |
| CallbackQueryHandler, |
| ContextTypes, |
| filters |
| ) |
|
|
|
|
| waiting_for_file = set() |
|
|
|
|
| |
| async def find_user_json(context, user_id): |
| async for msg in context.bot.get_chat_history(GROUP_ID, limit=200): |
| if msg.document and msg.document.file_name == f"{user_id}.json": |
| return msg.document.file_id |
| return None |
|
|
|
|
| |
| async def load_data(context, user_id): |
| file_id = await find_user_json(context, user_id) |
|
|
| if not file_id: |
| return {"files": []} |
|
|
| file = await context.bot.get_file(file_id) |
| content = await file.download_as_bytearray() |
|
|
| try: |
| data = json.loads(content.decode("utf-8")) |
| if "files" not in data: |
| return {"files": []} |
| return data |
| except: |
| return {"files": []} |
|
|
|
|
| |
| async def save_data(context, user_id, data): |
| json_bytes = json.dumps(data, indent=4).encode("utf-8") |
|
|
| await context.bot.send_document( |
| chat_id=GROUP_ID, |
| document=json_bytes, |
| filename=f"{user_id}.json", |
| caption=f"π Updated Index {user_id}" |
| ) |
|
|
|
|
| |
| async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): |
| keyboard = [ |
| [InlineKeyboardButton("π€ Upload", callback_data="upload")], |
| [InlineKeyboardButton("π My Files", callback_data="files")], |
| [InlineKeyboardButton("π Stats", callback_data="stats")] |
| ] |
|
|
| await update.message.reply_text( |
| "π Cloud Storage Bot (No Server Storage βοΈ)", |
| reply_markup=InlineKeyboardMarkup(keyboard) |
| ) |
|
|
|
|
| |
| async def buttons(update: Update, context: ContextTypes.DEFAULT_TYPE): |
| query = update.callback_query |
| await query.answer() |
|
|
| user_id = query.from_user.id |
|
|
| if query.data == "upload": |
| waiting_for_file.add(user_id) |
| await query.message.reply_text("π Send file to upload") |
|
|
| elif query.data == "files": |
| data = await load_data(context, user_id) |
|
|
| if not data["files"]: |
| await query.message.reply_text("β No files") |
| return |
|
|
| msg = "π Your Files:\n\n" |
| for f in data["files"][-10:]: |
| msg += f"β’ {f['file_name']}\n" |
|
|
| await query.message.reply_text(msg) |
|
|
| elif query.data == "stats": |
| data = await load_data(context, user_id) |
| await query.message.reply_text(f"π Total files: {len(data['files'])}") |
|
|
|
|
| |
| async def save_command(update: Update, context: ContextTypes.DEFAULT_TYPE): |
| waiting_for_file.add(update.effective_user.id) |
| await update.message.reply_text("π Send file to save") |
|
|
|
|
| |
| async def handle_file(update: Update, context: ContextTypes.DEFAULT_TYPE): |
| user_id = update.effective_user.id |
|
|
| if user_id not in waiting_for_file: |
| return |
|
|
| doc = update.message.document |
| if not doc: |
| return |
|
|
| try: |
| original_name = doc.file_name |
| ext = original_name.split('.')[-1] if '.' in original_name else "dat" |
|
|
| timestamp = int(time.time()) |
| upload_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
|
| new_name = f"{user_id}_{timestamp}.{ext}" |
|
|
| |
| sent = await context.bot.send_document( |
| chat_id=GROUP_ID, |
| document=doc.file_id, |
| filename=new_name |
| ) |
|
|
| |
| data = await load_data(context, user_id) |
|
|
| entry = { |
| "file_name": new_name, |
| "original_name": original_name, |
| "file_id": sent.document.file_id, |
| "file_size": doc.file_size, |
| "upload_date": upload_date, |
| "timestamp": timestamp |
| } |
|
|
| data["files"].append(entry) |
|
|
| |
| await save_data(context, user_id, data) |
|
|
| await update.message.reply_text("β
File saved successfully βοΈ") |
|
|
| except Exception as e: |
| print(e) |
| await update.message.reply_text(f"β Error: {e}") |
|
|
| waiting_for_file.discard(user_id) |
|
|
|
|
| |
| async def myfiles(update: Update, context: ContextTypes.DEFAULT_TYPE): |
| user_id = update.effective_user.id |
| data = await load_data(context, user_id) |
|
|
| if not data["files"]: |
| await update.message.reply_text("β No files found") |
| return |
|
|
| msg = "π Your Files:\n\n" |
| for f in data["files"]: |
| msg += f"β’ {f['file_name']} ({f['upload_date']})\n" |
|
|
| await update.message.reply_text(msg) |
|
|
|
|
| |
| async def get_file(update: Update, context: ContextTypes.DEFAULT_TYPE): |
| user_id = update.effective_user.id |
| data = await load_data(context, user_id) |
|
|
| if not context.args: |
| await update.message.reply_text("Usage: /get filename") |
| return |
|
|
| filename = context.args[0] |
|
|
| for f in data["files"]: |
| if f["file_name"] == filename: |
| await context.bot.send_document( |
| chat_id=update.effective_chat.id, |
| document=f["file_id"] |
| ) |
| return |
|
|
| await update.message.reply_text("β File not found") |
|
|
|
|
| |
| async def stats(update: Update, context: ContextTypes.DEFAULT_TYPE): |
| user_id = update.effective_user.id |
| data = await load_data(context, user_id) |
|
|
| total = len(data["files"]) |
| total_size = sum(f["file_size"] for f in data["files"]) |
|
|
| await update.message.reply_text( |
| f"π Files: {total}\nπΎ Size: {round(total_size/1024,2)} KB" |
| ) |
|
|
|
|
| |
| app = ApplicationBuilder().token(BOT_TOKEN).build() |
|
|
| app.add_handler(CommandHandler("start", start)) |
| app.add_handler(CommandHandler("save", save_command)) |
| app.add_handler(CommandHandler("myfiles", myfiles)) |
| app.add_handler(CommandHandler("get", get_file)) |
| app.add_handler(CommandHandler("stats", stats)) |
|
|
| app.add_handler(CallbackQueryHandler(buttons)) |
| app.add_handler(MessageHandler(filters.Document.ALL, handle_file)) |
|
|
| print("π₯ CLOUD STORAGE BOT RUNNING...") |
| app.run_polling() |