File size: 6,681 Bytes
14d7d32 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 | from . import kanha_bot, kanha_cmd
from telethon import events
from datetime import datetime, timedelta
import asyncio
import time
import subprocess
import traceback
import sys
import io
import pytz
# Store bot start time
BOT_START_TIME = time.time()
def get_uptime():
uptime_sec = int(time.time() - BOT_START_TIME)
return str(timedelta(seconds=uptime_sec))
# ALIVE Command
@kanha_bot.on(events.NewMessage(outgoing=True, pattern=r"[.!]alive"))
async def alive(event):
user = await event.client.get_me()
name = user.first_name or user.username or "User"
uptime = get_uptime()
await event.edit(
f"β
**I'm alive and operational!**\n\n"
f"π€ Account: `{name}`\n"
f"β±οΈ Uptime: `{uptime}`\n"
f"β‘ Powered by: `Telethon`\n"
)
# PING Command
@kanha_bot.on(events.NewMessage(outgoing=True, pattern=r"[.!]ping"))
async def ping(event):
start = datetime.now()
msg = await event.reply("π Pinging...")
end = datetime.now()
duration = (end - start).microseconds / 1000
await msg.edit(f"π Pong!\nβ±οΈ `{duration:.2f}ms`")
# BASH Command
@kanha_bot.on(events.NewMessage(outgoing=True, pattern=r"[.!]bash (.+)"))
async def run_bash(event):
cmd = event.pattern_match.group(1)
if not cmd:
return await event.edit("`Give a Bash code to run`")
status = await event.reply("π οΈ Processing command...")
try:
process = await asyncio.create_subprocess_shell(
cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
result = ""
if stdout:
result += f"πΉ **Output:**\n`{stdout.decode().strip()}`\n"
if stderr:
result += f"π» **Error:**\n`{stderr.decode().strip()}`"
if not result:
result = "β
**Command executed. No output.**"
await status.edit(result[:4096])
except Exception as e:
await status.edit(f"β Bash Error: `{e}`")
# EVAL Command
@kanha_bot.on(events.NewMessage(outgoing=True, pattern=r"[.!]eval (.+)"))
async def run_eval(event):
code = event.pattern_match.group(1)
if not code:
return await event.edit("`Give a Python code to run`")
status = await event.reply("βοΈ Evaluating...")
old_stdout = sys.stdout
old_stderr = sys.stderr
stdout = io.StringIO()
stderr = io.StringIO()
sys.stdout = stdout
sys.stderr = stderr
try:
exec(
f"async def __eval_fn(event):\n"
+ "\n".join(f" {line}" for line in code.split("\n"))
)
result = await locals()["__eval_fn"](event)
except Exception:
result = traceback.format_exc()
sys.stdout = old_stdout
sys.stderr = old_stderr
output = stdout.getvalue() + stderr.getvalue()
if not output and result:
output = str(result)
if not output:
output = "β
**Code executed. No output.**"
await status.edit(f"π₯ **Input:**\n```{code}```\n\nπ€ **Output:**\n```{output.strip()[:4000]}```")
from telethon import events
@kanha_bot.on(events.NewMessage(outgoing=True, pattern=r"[.!]id$"))
async def get_ids(event):
chat_id = event.chat_id
reply = await event.get_reply_message()
if reply and reply.sender_id:
user_id = reply.sender_id
await event.reply(f"π€ Replied User ID: `{user_id}`\nπ¬ Chat ID: `{chat_id}`")
else:
await event.reply(f"π¬ Chat ID: `{chat_id}`\nβ οΈ No user replied.")
# MULTI SCHEDULER Command
@kanha_bot.on(events.NewMessage(outgoing=True, pattern=r"[.!]s (\d+) ([0-9]{1,2}:[0-9]{2})(?:,([0-9]{1,2}:[0-9]{2}))? (.+)"))
async def schedule_multi(event):
try:
args = event.pattern_match
count = int(args[1])
time_str1 = args[2]
time_str2 = args[3] if args[3] else None
message = args[4]
if not args:
return await event.edit("Use .s 1 20:28 .hexa")
ist = pytz.timezone("Asia/Kolkata")
now = datetime.now(ist)
chat_id = event.chat_id
scheduled = 0
def parse_time(time_str):
hour, minute = map(int, time_str.split(":"))
return hour, minute
times = [parse_time(time_str1)]
if time_str2:
times.append(parse_time(time_str2))
for day in range(count):
for hour, minute in times:
scheduled_time = ist.localize(datetime(now.year, now.month, now.day, hour, minute)) + timedelta(days=day)
try:
await event.client.send_message(chat_id, message, schedule=scheduled_time)
scheduled += 1
except Exception as e:
await event.reply(f"β Failed on day {day + 1} at {hour}:{minute}: {e}")
break
await event.reply(f"β
Scheduled `{message}` for {scheduled} time(s) across {count} day(s).")
except Exception as e:
await event.reply(f"β Error: {e}")
from telethon.tl.functions.messages import DeleteScheduledMessagesRequest as G
import asyncio
@kanha_bot.on(events.NewMessage(outgoing=True, pattern=r"[.!]dels(?:\s+(.+))?"))
async def delete_scheduled_safe(event):
try:
arg = event.pattern_match.group(1)
# Detect chat
if arg:
chat = arg.strip()
if chat.isdigit() or (chat.startswith("-") and chat[1:].isdigit()):
chat = int(chat)
elif not chat.startswith("@"):
chat = "@" + chat
else:
chat = event.chat_id
await event.edit(f"π Fetching scheduled messages from `{chat}`...")
# Collect all scheduled message IDs
ids = [msg.id async for msg in event.client.iter_messages(chat, scheduled=True)]
if not ids:
return await event.edit("β No scheduled messages found.")
deleted = 0
batch_size = 5 # delete 5 messages per request to reduce spam risk
# Delete in small batches
for i in range(0, len(ids), batch_size):
try:
await event.client(G(chat, ids[i:i+batch_size]))
deleted += len(ids[i:i+batch_size])
await asyncio.sleep(0.5) # small delay to avoid flood ban
except Exception as e:
await event.reply(f"β Error deleting batch {i//batch_size+1}: {e}")
await asyncio.sleep(1) # longer delay after error
await event.edit(f"β
Deleted `{deleted}` scheduled message(s) from `{chat}`.")
except Exception as e:
await event.edit(f"β Error: {e}")
|