tgtest / ap.p.py
ulduldp's picture
Rename app.py to ap.p.py
91d9d9d verified
BOT_TOKEN = "6740023413:AAHZ5iEiXjkY7IGwljg37llFbekKLbIVkkw"
# πŸ”΄ HARD CODED VALUES
GROUP_ID = -1003799294466
import json
import time
from datetime import datetime
from telegram import (
Update,
InlineKeyboardButton,
InlineKeyboardMarkup
)
from telegram.ext import (
ApplicationBuilder,
MessageHandler,
CommandHandler,
CallbackQueryHandler,
ContextTypes,
filters
)
waiting_for_file = set()
# πŸ” FIND USER JSON IN GROUP
async def find_user_json(context, user_id):
async for msg in context.bot.get_chat_history(GROUP_ID, limit=200):
if msg.document and msg.document.file_name == f"{user_id}.json":
return msg.document.file_id
return None
# πŸ“₯ LOAD JSON FROM GROUP
async def load_data(context, user_id):
file_id = await find_user_json(context, user_id)
if not file_id:
return {"files": []}
file = await context.bot.get_file(file_id)
content = await file.download_as_bytearray()
try:
data = json.loads(content.decode("utf-8"))
if "files" not in data:
return {"files": []}
return data
except:
return {"files": []}
# πŸ“€ SAVE JSON TO GROUP
async def save_data(context, user_id, data):
json_bytes = json.dumps(data, indent=4).encode("utf-8")
await context.bot.send_document(
chat_id=GROUP_ID,
document=json_bytes,
filename=f"{user_id}.json",
caption=f"πŸ“ Updated Index {user_id}"
)
# πŸš€ START
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
keyboard = [
[InlineKeyboardButton("πŸ“€ Upload", callback_data="upload")],
[InlineKeyboardButton("πŸ“‚ My Files", callback_data="files")],
[InlineKeyboardButton("πŸ“Š Stats", callback_data="stats")]
]
await update.message.reply_text(
"πŸš€ Cloud Storage Bot (No Server Storage ☁️)",
reply_markup=InlineKeyboardMarkup(keyboard)
)
# πŸ”˜ BUTTON HANDLER
async def buttons(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
user_id = query.from_user.id
if query.data == "upload":
waiting_for_file.add(user_id)
await query.message.reply_text("πŸ“ Send file to upload")
elif query.data == "files":
data = await load_data(context, user_id)
if not data["files"]:
await query.message.reply_text("❌ No files")
return
msg = "πŸ“‚ Your Files:\n\n"
for f in data["files"][-10:]:
msg += f"β€’ {f['file_name']}\n"
await query.message.reply_text(msg)
elif query.data == "stats":
data = await load_data(context, user_id)
await query.message.reply_text(f"πŸ“Š Total files: {len(data['files'])}")
# 🟒 /save
async def save_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
waiting_for_file.add(update.effective_user.id)
await update.message.reply_text("πŸ“ Send file to save")
# πŸ“€ FILE HANDLER
async def handle_file(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
if user_id not in waiting_for_file:
return
doc = update.message.document
if not doc:
return
try:
original_name = doc.file_name
ext = original_name.split('.')[-1] if '.' in original_name else "dat"
timestamp = int(time.time())
upload_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
new_name = f"{user_id}_{timestamp}.{ext}"
# βœ… STEP 1: send file FIRST
sent = await context.bot.send_document(
chat_id=GROUP_ID,
document=doc.file_id,
filename=new_name
)
# βœ… STEP 2: load JSON from group
data = await load_data(context, user_id)
entry = {
"file_name": new_name,
"original_name": original_name,
"file_id": sent.document.file_id,
"file_size": doc.file_size,
"upload_date": upload_date,
"timestamp": timestamp
}
data["files"].append(entry)
# βœ… STEP 3: save JSON back to group
await save_data(context, user_id, data)
await update.message.reply_text("βœ… File saved successfully ☁️")
except Exception as e:
print(e)
await update.message.reply_text(f"❌ Error: {e}")
waiting_for_file.discard(user_id)
# πŸ“‚ /myfiles
async def myfiles(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
data = await load_data(context, user_id)
if not data["files"]:
await update.message.reply_text("❌ No files found")
return
msg = "πŸ“‚ Your Files:\n\n"
for f in data["files"]:
msg += f"β€’ {f['file_name']} ({f['upload_date']})\n"
await update.message.reply_text(msg)
# πŸ“₯ /get
async def get_file(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
data = await load_data(context, user_id)
if not context.args:
await update.message.reply_text("Usage: /get filename")
return
filename = context.args[0]
for f in data["files"]:
if f["file_name"] == filename:
await context.bot.send_document(
chat_id=update.effective_chat.id,
document=f["file_id"]
)
return
await update.message.reply_text("❌ File not found")
# πŸ“Š /stats
async def stats(update: Update, context: ContextTypes.DEFAULT_TYPE):
user_id = update.effective_user.id
data = await load_data(context, user_id)
total = len(data["files"])
total_size = sum(f["file_size"] for f in data["files"])
await update.message.reply_text(
f"πŸ“Š Files: {total}\nπŸ’Ύ Size: {round(total_size/1024,2)} KB"
)
# πŸš€ RUN BOT
app = ApplicationBuilder().token(BOT_TOKEN).build()
app.add_handler(CommandHandler("start", start))
app.add_handler(CommandHandler("save", save_command))
app.add_handler(CommandHandler("myfiles", myfiles))
app.add_handler(CommandHandler("get", get_file))
app.add_handler(CommandHandler("stats", stats))
app.add_handler(CallbackQueryHandler(buttons))
app.add_handler(MessageHandler(filters.Document.ALL, handle_file))
print("πŸ”₯ CLOUD STORAGE BOT RUNNING...")
app.run_polling()