Bsbsben / app.py
soxogvv's picture
Update app.py
a0bcda4 verified
import os
import glob
import importlib
import asyncio
from flask import Flask
from threading import Thread
from pymongo import MongoClient
from telethon.sync import TelegramClient
from telethon.sessions import StringSession
from telethon import events, Button
from datetime import datetime
import time
import psutil
import nest_asyncio
import sys
import requests
import subprocess
import random
nest_asyncio.apply()
# === API SETS ===
API_CREDENTIALS = [
{"API_ID": int(os.getenv("API_ID1")), "API_HASH": os.getenv("API_HASH1")},
]
BOT_TOKEN = os.getenv("BOT_TOKEN")
MONGO_URL = os.getenv("MONGO_URL")
MONGO_DB = os.getenv("MONGO_DB", "eleg4am_bot")
ADMIN_ID = int(os.getenv("ADMIN_ID"))
flask_app = Flask(__name__)
@flask_app.route("/")
def home():
return "Bot is alive!"
Thread(target=lambda: flask_app.run(host="0.0.0.0", port=7860)).start()
mongo = MongoClient(MONGO_URL)
db = mongo[MONGO_DB]
session_coll = db["sessions"]
all_bots = []
session_start_times = {}
paused_sessions = set()
controller_bot = TelegramClient("controller_bot", API_CREDENTIALS[0]["API_ID"], API_CREDENTIALS[0]["API_HASH"]).start(bot_token=BOT_TOKEN)
import os
import glob
import importlib
import sys
def load_plugins():
plugin_path = "plugins"
for path in glob.glob(f"{plugin_path}/*.py"):
filename = os.path.basename(path)
if filename == "__init__.py":
continue
module_name = f"{plugin_path}.{filename[:-3]}" # removes .py
try:
if module_name in sys.modules:
importlib.reload(sys.modules[module_name])
else:
importlib.import_module(module_name)
except Exception as e:
print(f"[PLUGIN ERROR] {filename}: {e}")
async def start_userbot(user_id, session_str, api_index=0):
try:
creds = API_CREDENTIALS[api_index]
bot = TelegramClient(
StringSession(session_str),
creds["API_ID"],
creds["API_HASH"],
device_model="Redmi 7"
)
await bot.start()
# Inject bot into the `plugins` namespace for dynamic plugin access
import plugins
plugins.kanha_bot = bot
from telethon import events
# Define `kanha_cmd` as a valid decorator for plugins
def kanha_cmd(**kwargs):
return events.NewMessage(outgoing=True, **kwargs)
plugins.kanha_cmd = kanha_cmd
# Load plugins after bot + decorator are ready
load_plugins()
# Add to active bot list
all_bots.append(bot)
session_start_times[user_id] = time.time()
# Run bot in background
asyncio.create_task(bot.run_until_disconnected())
except Exception as e:
print(f"❌ Failed to start userbot {user_id}: {e}")
from telethon.tl.custom import Conversation
@controller_bot.on(events.NewMessage(pattern="/help"))
async def help_command(event):
if event.sender_id != ADMIN_ID:
return
help_text = """
πŸ“– **Bot Command Guide**
πŸ”Ή **Session Management**
- `/addsession` β†’ Add a new user StringSession
- `/changesession <name>` β†’ Replace session for a user
- `/listsessions` β†’ List all active sessions
- `/pause <name>` β†’ Pause a session (disables logic)
- `/resume <name>` β†’ Resume a paused session
- `/reloadplugin` β†’ Reload all plugins
- `/reloadplugin <name>` β†’ Reload plugin for a specific user
- `/stats` β†’ Show min/max uptime
- `/stats <name>` β†’ Show uptime for a specific session
- `/status` β†’ Show system CPU and RAM usage
πŸ”Ή **Danger Zone**
- You can delete sessions using inline buttons inside `/listsessions`
- Session updates require confirmation with βœ… Confirm button
πŸ”’ **Note:** Only the admin can run these commands.
"""
await event.respond(help_text)
@controller_bot.on(events.NewMessage(pattern=r"/changesession (.+)"))
async def change_session(event):
if event.sender_id != ADMIN_ID:
return
query = event.pattern_match.group(1).strip().lower()
matches = [s for s in session_coll.find() if query in s.get("name", "").lower()]
if not matches:
await event.reply("❌ No matching session.")
return
if len(matches) == 1:
target_session = matches[0]
else:
buttons = [
[Button.inline(f"{s['name']} ({s['user_id']})", data=f"chgs_{s['user_id']}")]
for s in matches
]
await event.respond("Select session to change:", buttons=buttons)
return
async with controller_bot.conversation(event.chat_id, timeout=300) as conv:
await conv.send_message("✍️ Send new StringSession for update:")
msg = await conv.get_response()
new_str = msg.raw_text.strip()
try:
tmp = TelegramClient(StringSession(new_str), API_CREDENTIALS[0]["API_ID"], API_CREDENTIALS[0]["API_HASH"])
await tmp.start()
me = await tmp.get_me()
new_user_id = str(me.id)
name = me.first_name or me.username or "Unknown"
await tmp.disconnect()
except Exception as e:
await conv.send_message(f"❌ Invalid session: {e}")
return
confirm_text = f"⚠️ Replace old session?\n\nNew session:\nπŸ‘€ `{name}`\nπŸ†” `{new_user_id}`"
buttons = [
[Button.inline("βœ… Confirm", data=f"confirmsession_{target_session['user_id']}|{new_str}"),
Button.inline("❌ Cancel", data="cancel")]
]
await conv.send_message(confirm_text, buttons=buttons)
@controller_bot.on(events.CallbackQuery(data=lambda d: d.startswith(b"chgs_")))
async def select_session_for_change(event):
user_id = event.data.decode().split("_", 1)[1]
session = session_coll.find_one({"user_id": user_id})
if session:
await event.respond(f"✍️ Send new StringSession for `{session.get('name')}`:")
@controller_bot.on(events.CallbackQuery(data=lambda d: d.startswith(b"confirmsession_")))
async def confirm_session_change(event):
raw = event.data.decode().split("_", 1)[1]
old_uid, new_session = raw.split("|", 1)
try:
temp = TelegramClient(StringSession(new_session), API_CREDENTIALS[0]["API_ID"], API_CREDENTIALS[0]["API_HASH"],device_model="Redmi 7")
await temp.start()
me = await temp.get_me()
new_name = me.first_name or me.username or "Unknown"
new_uid = str(me.id)
await temp.disconnect()
session_coll.update_one(
{"user_id": old_uid},
{"$set": {"session": new_session, "user_id": new_uid, "name": new_name}},
upsert=True
)
await event.edit(f"βœ… Session updated for `{new_name}` (`{new_uid}`)")
# Start or restart the updated userbot
await start_userbot(new_uid, new_session, session_coll.find_one({"user_id": new_uid})["api_index"])
except Exception as e:
await event.edit(f"❌ Failed to update session: {e}")
@controller_bot.on(events.CallbackQuery(data=b"cancel"))
async def cancel_action(event):
await event.edit("❎ Action cancelled.")
async def load_all_sessions():
sessions = list(session_coll.find())
for i, sess in enumerate(sessions):
api_index = sess.get("api_index", i // 15)
await start_userbot(sess["user_id"], sess["session"], api_index)
from telethon.tl.custom import Conversation
from telethon import TelegramClient, events
from telethon.sessions import StringSession
from session_converter import SessionManager
@controller_bot.on(events.NewMessage(pattern="/addsession"))
async def addsession(event):
if event.sender_id != ADMIN_ID:
return
async with controller_bot.conversation(event.chat_id, timeout=300) as conv:
await conv.send_message("πŸ“© Send your StringSession:")
msg = await conv.get_response()
session_str = msg.raw_text.strip()
# Step 1: Detect and convert Pyrogram β†’ Telethon
try:
if not session_str.startswith(("1A", "1B")): # Likely Pyrogram
try:
session_manager = SessionManager.from_pyrogram_string_session(session_str)
session_str = session_manager.telethon_string_session()
await conv.send_message("πŸ”„ Converted Pyrogram session to Telethon.")
except Exception as e:
await conv.send_message(f"❌ Pyrogram conversion failed:\n`{e}`")
return
else:
await conv.send_message("βœ… Detected Telethon session.")
except Exception as e:
await conv.send_message(f"❌ Session detection failed:\n`{e}`")
return
# Step 2: Find available API slot
selected_index = -1
for i, creds in enumerate(API_CREDENTIALS):
count = session_coll.count_documents({"api_index": i})
if count < 15:
selected_index = i
break
if selected_index == -1:
await conv.send_message("❌ All API slots are full (15 accounts per slot). Please add more API credentials.")
return
api_id = API_CREDENTIALS[selected_index]["API_ID"]
api_hash = API_CREDENTIALS[selected_index]["API_HASH"]
# Step 3: Validate and connect
try:
temp_client = TelegramClient(StringSession(session_str), api_id, api_hash, device_model="Redmi 7")
await temp_client.start()
me = await temp_client.get_me()
user_id = str(me.id)
name = me.first_name or me.username or "Unnamed"
# Step 4: Check existing session
if session_coll.find_one({"user_id": user_id}):
await conv.send_message("⚠️ This account is already added.\nUpdating session...")
session_coll.update_one(
{"user_id": user_id},
{"$set": {
"session": session_str,
"name": name,
"api_index": selected_index
}}
)
await temp_client.disconnect()
await start_userbot(user_id, session_str, selected_index)
return
# Step 5: Store in DB
session_coll.update_one(
{"user_id": user_id},
{"$set": {
"session": session_str,
"name": name,
"user_id": user_id,
"api_index": selected_index
}},
upsert=True
)
await conv.send_message(f"βœ… Session added for `{name}` (`{user_id}`) using API #{selected_index}")
await temp_client.disconnect()
await start_userbot(user_id, session_str, selected_index)
except Exception as e:
await conv.send_message(f"❌ Invalid session:\n`{e}`")
from telethon import Button, events
MAX_PER_PAGE = 20 # 20 accounts per page, 2 per row => 10 rows
@controller_bot.on(events.NewMessage(pattern="/listsessions"))
async def list_sessions(event):
if event.sender_id != ADMIN_ID:
return
await send_session_page(event, 0) # Start from page 0
async def send_session_page(event, page):
sessions = list(session_coll.find())
# Sort alphabetically by name
sessions.sort(key=lambda s: s.get("name", "").lower())
total_pages = (len(sessions) - 1) // MAX_PER_PAGE + 1
start_index = page * MAX_PER_PAGE
end_index = start_index + MAX_PER_PAGE
page_sessions = sessions[start_index:end_index]
msg = "πŸ“„ **All Sessions:**\n\n"
# Prepare inline buttons (2 per row)
buttons = []
row = []
for idx, sess in enumerate(page_sessions, start=start_index + 1):
name = sess.get("name", "unknown")
uid = sess.get("user_id")
username = sess.get("username")
paused = "⏸️" if uid in paused_sessions else "βœ…"
# Clickable link if username exists
if username:
clickable_name = f"[{name}](https://t.me/{username})"
else:
clickable_name = name # fallback to plain text if no username
msg += f"{idx}. {paused} {clickable_name} (`{uid}`)\n"
row.append(Button.inline(name, data=f"view_{uid}"))
if len(row) == 2: # 2 per row
buttons.append(row)
row = []
if row:
buttons.append(row) # last partial row if exists
# Navigation buttons
nav_buttons = []
if page > 0:
nav_buttons.append(Button.inline("◀️", data=f"prevpage_{page}"))
if page < total_pages - 1:
nav_buttons.append(Button.inline("▢️", data=f"nextpage_{page}"))
if nav_buttons:
buttons.append(nav_buttons)
# Send or edit message
if isinstance(event, events.NewMessage.Event):
await event.respond(msg, buttons=buttons, link_preview=False)
else:
await event.edit(msg, buttons=buttons, link_preview=False)
@controller_bot.on(events.CallbackQuery(data=lambda d: d.startswith(b"nextpage_")))
async def next_page(event):
page = int(event.data.decode().split("_")[1])
await send_session_page(event, page + 1)
@controller_bot.on(events.CallbackQuery(data=lambda d: d.startswith(b"prevpage_")))
async def prev_page(event):
page = int(event.data.decode().split("_")[1])
await send_session_page(event, page - 1)
@controller_bot.on(events.CallbackQuery(data=lambda d: d.startswith(b"view_")))
async def view_session_details(event):
uid = event.data.decode().split("_", 1)[1]
session = session_coll.find_one({"user_id": uid})
if not session:
await event.edit("❌ Session not found.")
return
name = session.get("name", "Unknown")
username = session.get("username")
details = f"🧾 **Session Info**\n\nπŸ‘€ Name: `{name}`"
if username:
details += f"\nπŸ“› Username: @{username}"
details += f"\nπŸ†” ID: `{uid}`"
buttons = [
[Button.inline("βœ… Confirm Delete", data=f"confirmdel_{uid}"), Button.inline("❌ Cancel", data="cancel")]
]
await event.edit(details, buttons=buttons)
@controller_bot.on(events.CallbackQuery(data=lambda d: d.startswith(b"confirmdel_")))
async def confirm_delete(event):
uid = event.data.decode().split("_", 1)[1]
result = session_coll.delete_one({"user_id": uid})
if result.deleted_count:
await event.edit(f"βœ… Session `{uid}` deleted successfully.")
else:
await event.edit("❌ Could not delete session.")
@controller_bot.on(events.CallbackQuery(data=b"cancel"))
async def cancel_action(event):
await event.edit("❎ Action cancelled.")
import os
import psutil
import time
# Store process object
process = psutil.Process(os.getpid())
# Store initial network counters (bytes sent/received)
net_start = psutil.net_io_counters()
start_time = time.time()
def format_size(bytes_val):
"""Format bytes into human-readable GB or MB."""
gb = bytes_val / (1024 ** 3)
if gb >= 1:
return f"{gb:.2f} GB"
mb = bytes_val / (1024 ** 2)
return f"{mb:.2f} MB"
@controller_bot.on(events.NewMessage(pattern="/status"))
async def app_status(event):
if event.sender_id != ADMIN_ID:
return
# CPU usage only for this process
cpu_usage = process.cpu_percent(interval=1)
# Memory usage in bytes β†’ GB
mem_info = process.memory_info()
ram_used_gb = mem_info.rss / (1024 ** 3)
total_ram_gb = psutil.virtual_memory().total / (1024 ** 3)
ram_percent = (mem_info.rss / psutil.virtual_memory().total) * 100
# Network usage since start
net_now = psutil.net_io_counters()
bytes_sent = net_now.bytes_sent - net_start.bytes_sent
bytes_recv = net_now.bytes_recv - net_start.bytes_recv
await event.respond(
f"πŸ“Š **App Resource Usage**\n"
f"🧠 CPU: `{cpu_usage:.2f}%`\n"
f"πŸ’Ύ RAM: `{ram_used_gb:.2f} GB / {total_ram_gb:.2f} GB` ({ram_percent:.2f}%)\n"
f"⬇️ Downloaded: `{format_size(bytes_recv)}`\n"
f"⬆️ Uploaded: `{format_size(bytes_sent)}`\n"
f"⏳ Uptime: `{int(time.time() - start_time)}s`"
)
@controller_bot.on(events.NewMessage(pattern=r"/pause (.+)"))
async def pause_cmd(event):
if event.sender_id != ADMIN_ID:
return
query = event.pattern_match.group(1).strip().lower()
matches = [s for s in session_coll.find() if query in s.get("name", "").lower()]
if not matches:
await event.reply("❌ No matching session found.")
return
if len(matches) == 1:
user_id = matches[0]["user_id"]
name = matches[0].get("name", "Unknown")
if matches[0].get("paused"):
await event.reply(f"⚠️ `{name}` is already paused.")
return
session_coll.update_one({"user_id": user_id}, {"$set": {"paused": True}})
await event.reply(f"⏸️ Session `{name}` is now paused.")
else:
buttons = [
[Button.inline(f"{s['name']} ({s['user_id']})", data=f"pauseone_{s['user_id']}")]
for s in matches
]
await event.respond("Select a session to pause:", buttons=buttons)
@controller_bot.on(events.CallbackQuery(data=lambda d: d.startswith(b"pauseone_")))
async def pause_cb_handler(event):
user_id = event.data.decode().split("_", 1)[1]
session = session_coll.find_one({"user_id": user_id})
if not session:
await event.edit("❌ Session not found.")
return
name = session.get("name", "Unknown")
if session.get("paused"):
await event.edit(f"⚠️ `{name}` is already paused.")
return
session_coll.update_one({"user_id": user_id}, {"$set": {"paused": True}})
await event.edit(f"⏸️ Session `{name}` is now paused.")
@controller_bot.on(events.NewMessage(pattern=r"/resume (.+)"))
async def resume_cmd(event):
if event.sender_id != ADMIN_ID:
return
query = event.pattern_match.group(1).strip().lower()
matches = [s for s in session_coll.find() if query in s.get("name", "").lower()]
if not matches:
await event.reply("❌ No matching session.")
return
if len(matches) == 1:
user_id = matches[0]["user_id"]
name = matches[0].get("name", "Unknown")
if not matches[0].get("paused"):
await event.reply(f"⚠️ `{name}` is already active.")
return
session_coll.update_one({"user_id": user_id}, {"$set": {"paused": False}})
paused_sessions.discard(user_id) # Optional if you still use in-memory
await event.reply(f"▢️ Resumed `{name}`")
else:
buttons = [
[Button.inline(f"{s['name']} ({s['user_id']})", data=f"resumeone_{s['user_id']}")]
for s in matches
]
await event.respond("Select session to resume:", buttons=buttons)
@controller_bot.on(events.CallbackQuery(data=lambda d: d.startswith(b"resumeone_")))
async def resume_cb(event):
user_id = event.data.decode().split("_", 1)[1]
session = session_coll.find_one({"user_id": user_id})
if not session:
await event.edit("❌ Session not found.")
return
name = session.get("name", "Unknown")
if not session.get("paused"):
await event.edit(f"⚠️ `{name}` is already active.")
return
session_coll.update_one({"user_id": user_id}, {"$set": {"paused": False}})
paused_sessions.discard(user_id)
await event.edit(f"▢️ Resumed `{name}`")
@controller_bot.on(events.NewMessage(pattern=r"/stats(?: (.*))?"))
async def stats_handler(event):
if event.sender_id != ADMIN_ID:
return
arg = event.pattern_match.group(1)
sessions = list(session_coll.find())
matched = []
if arg:
query = arg.strip().lower()
for sess in sessions:
name = sess.get("name", "").lower()
if query in name:
matched.append(sess)
if not matched:
await event.reply("❌ No matching session.")
return
for sess in matched:
user_id = sess["user_id"]
name = sess.get("name", user_id)
api_index = sess.get("api_index", 0)
uptime = time.time() - session_start_times.get(user_id, time.time())
uptime_str = str(datetime.utcfromtimestamp(uptime).strftime("%Hh %Mm %Ss"))
await event.respond(f"πŸ“Š **Stats for {name}**\nID: `{user_id}`\nAPI Index: `{api_index}`\nUptime: `{uptime_str}`")
else:
uptimes = [time.time() - session_start_times.get(s["user_id"], time.time()) for s in sessions]
if not uptimes:
await event.respond("❌ No uptime data.")
return
min_u = min(uptimes)
max_u = max(uptimes)
min_str = str(datetime.utcfromtimestamp(min_u).strftime("%Hh %Mm %Ss"))
max_str = str(datetime.utcfromtimestamp(max_u).strftime("%Hh %Mm %Ss"))
await event.respond(f"πŸ“ˆ **Uptime Stats (All Sessions)**\nMinimum: `{min_str}`\nMaximum: `{max_str}`")
@controller_bot.on(events.NewMessage(pattern=r"/reloadplugin(?: (.*))?"))
async def restart_plugin_cmd(event):
if event.sender_id != ADMIN_ID:
return
query = event.pattern_match.group(1)
sessions = list(session_coll.find())
if query:
query = query.strip().lower()
matches = []
for sess in sessions:
name = sess.get("name", "").lower()
if query in name:
matches.append(sess)
if not matches:
await event.reply(f"❌ No account matched for keyword: `{query}`")
return
if len(matches) == 1:
sess = matches[0]
await restart_plugin_for_user(sess["user_id"], sess["session"], sess.get("api_index", 0), event)
else:
buttons = [
[Button.inline(f"{s['name']} ({s['user_id']})", data=f"rplgx_{s['user_id']}")]
for s in matches
]
await event.respond("πŸ” Multiple matches. Select one:", buttons=buttons)
else:
await event.respond("♻️ Restarting all plugins...")
count = 0
for sess in sessions:
ok = await restart_plugin_for_user(sess["user_id"], sess["session"], sess.get("api_index", 0))
if ok:
count += 1
await event.respond(f"βœ… Restarted plugin logic for {count} sessions.")
@controller_bot.on(events.CallbackQuery(data=lambda d: d.startswith(b"rplgx_")))
async def restart_plugin_button(event):
if event.sender_id != ADMIN_ID:
return
user_id = event.data.decode().split("_", 1)[1]
sess = session_coll.find_one({"user_id": user_id})
if sess:
await restart_plugin_for_user(user_id, sess["session"], sess.get("api_index", 0), event)
async def restart_plugin_for_user(user_id: str, session_str: str, api_index: int, respond_event=None):
try:
# Find and stop existing instance
for bot in all_bots:
me = await bot.get_me()
if str(me.id) == user_id:
await bot.disconnect()
all_bots.remove(bot)
break
# Restart with same session
creds = API_CREDENTIALS[api_index]
bot = TelegramClient(StringSession(session_str), creds["API_ID"], creds["API_HASH"], device_model="Redmi 7")
await bot.start()
globals()['kanha_bot'] = bot
from telethon import events
def cmd(**kwargs):
return events.NewMessage(outgoing=True, **kwargs)
bot.cmd = cmd
load_plugins()
all_bots.append(bot)
asyncio.create_task(bot.run_until_disconnected())
msg = f"βœ… Restarted plugin session for `{user_id}`"
if respond_event:
await respond_event.respond(msg) if isinstance(respond_event, events.NewMessage.Event) else await respond_event.edit(msg)
return True
except Exception as e:
if respond_event:
await respond_event.respond(f"❌ Failed to restart `{user_id}`: {e}")
return False