| from . import kanha_bot, kanha_cmd |
| from telethon import events |
| from datetime import datetime, timedelta |
| import asyncio |
| import time |
| import subprocess |
| import traceback |
| import sys |
| import io |
| import pytz |
|
|
| |
| BOT_START_TIME = time.time() |
|
|
| def get_uptime(): |
| uptime_sec = int(time.time() - BOT_START_TIME) |
| return str(timedelta(seconds=uptime_sec)) |
|
|
| |
| @kanha_bot.on(events.NewMessage(outgoing=True, pattern=r"[.!]alive")) |
| async def alive(event): |
| user = await event.client.get_me() |
| name = user.first_name or user.username or "User" |
| uptime = get_uptime() |
| await event.edit( |
| f"β
**I'm alive and operational!**\n\n" |
| f"π€ Account: `{name}`\n" |
| f"β±οΈ Uptime: `{uptime}`\n" |
| f"β‘ Powered by: `Telethon`\n" |
| ) |
|
|
| |
| @kanha_bot.on(events.NewMessage(outgoing=True, pattern=r"[.!]ping")) |
| async def ping(event): |
| start = datetime.now() |
| msg = await event.reply("π Pinging...") |
| end = datetime.now() |
| duration = (end - start).microseconds / 1000 |
| await msg.edit(f"π Pong!\nβ±οΈ `{duration:.2f}ms`") |
|
|
| |
| @kanha_bot.on(events.NewMessage(outgoing=True, pattern=r"[.!]bash (.+)")) |
| async def run_bash(event): |
| cmd = event.pattern_match.group(1) |
| if not cmd: |
| return await event.edit("`Give a Bash code to run`") |
| status = await event.reply("π οΈ Processing command...") |
|
|
| try: |
| process = await asyncio.create_subprocess_shell( |
| cmd, |
| stdout=asyncio.subprocess.PIPE, |
| stderr=asyncio.subprocess.PIPE |
| ) |
| stdout, stderr = await process.communicate() |
|
|
| result = "" |
| if stdout: |
| result += f"πΉ **Output:**\n`{stdout.decode().strip()}`\n" |
| if stderr: |
| result += f"π» **Error:**\n`{stderr.decode().strip()}`" |
|
|
| if not result: |
| result = "β
**Command executed. No output.**" |
|
|
| await status.edit(result[:4096]) |
| except Exception as e: |
| await status.edit(f"β Bash Error: `{e}`") |
|
|
| |
| @kanha_bot.on(events.NewMessage(outgoing=True, pattern=r"[.!]eval (.+)")) |
| async def run_eval(event): |
| code = event.pattern_match.group(1) |
| if not code: |
| return await event.edit("`Give a Python code to run`") |
| status = await event.reply("βοΈ Evaluating...") |
|
|
| old_stdout = sys.stdout |
| old_stderr = sys.stderr |
| stdout = io.StringIO() |
| stderr = io.StringIO() |
| sys.stdout = stdout |
| sys.stderr = stderr |
|
|
| try: |
| exec( |
| f"async def __eval_fn(event):\n" |
| + "\n".join(f" {line}" for line in code.split("\n")) |
| ) |
| result = await locals()["__eval_fn"](event) |
| except Exception: |
| result = traceback.format_exc() |
|
|
| sys.stdout = old_stdout |
| sys.stderr = old_stderr |
|
|
| output = stdout.getvalue() + stderr.getvalue() |
| if not output and result: |
| output = str(result) |
| if not output: |
| output = "β
**Code executed. No output.**" |
|
|
| await status.edit(f"π₯ **Input:**\n```{code}```\n\nπ€ **Output:**\n```{output.strip()[:4000]}```") |
| from telethon import events |
|
|
| @kanha_bot.on(events.NewMessage(outgoing=True, pattern=r"[.!]id$")) |
| async def get_ids(event): |
| chat_id = event.chat_id |
| reply = await event.get_reply_message() |
|
|
| if reply and reply.sender_id: |
| user_id = reply.sender_id |
| await event.reply(f"π€ Replied User ID: `{user_id}`\nπ¬ Chat ID: `{chat_id}`") |
| else: |
| await event.reply(f"π¬ Chat ID: `{chat_id}`\nβ οΈ No user replied.") |
| |
| |
| @kanha_bot.on(events.NewMessage(outgoing=True, pattern=r"[.!]s (\d+) ([0-9]{1,2}:[0-9]{2})(?:,([0-9]{1,2}:[0-9]{2}))? (.+)")) |
| async def schedule_multi(event): |
| try: |
| args = event.pattern_match |
| count = int(args[1]) |
| time_str1 = args[2] |
| time_str2 = args[3] if args[3] else None |
| message = args[4] |
| if not args: |
| return await event.edit("Use .s 1 20:28 .hexa") |
| ist = pytz.timezone("Asia/Kolkata") |
| now = datetime.now(ist) |
|
|
| chat_id = event.chat_id |
| scheduled = 0 |
|
|
| def parse_time(time_str): |
| hour, minute = map(int, time_str.split(":")) |
| return hour, minute |
|
|
| times = [parse_time(time_str1)] |
| if time_str2: |
| times.append(parse_time(time_str2)) |
|
|
| for day in range(count): |
| for hour, minute in times: |
| scheduled_time = ist.localize(datetime(now.year, now.month, now.day, hour, minute)) + timedelta(days=day) |
| try: |
| await event.client.send_message(chat_id, message, schedule=scheduled_time) |
| scheduled += 1 |
| except Exception as e: |
| await event.reply(f"β Failed on day {day + 1} at {hour}:{minute}: {e}") |
| break |
|
|
| await event.reply(f"β
Scheduled `{message}` for {scheduled} time(s) across {count} day(s).") |
|
|
| except Exception as e: |
| await event.reply(f"β Error: {e}") |
|
|
| from telethon.tl.functions.messages import DeleteScheduledMessagesRequest as G |
| import asyncio |
|
|
| @kanha_bot.on(events.NewMessage(outgoing=True, pattern=r"[.!]dels(?:\s+(.+))?")) |
| async def delete_scheduled_safe(event): |
| try: |
| arg = event.pattern_match.group(1) |
|
|
| |
| if arg: |
| chat = arg.strip() |
| if chat.isdigit() or (chat.startswith("-") and chat[1:].isdigit()): |
| chat = int(chat) |
| elif not chat.startswith("@"): |
| chat = "@" + chat |
| else: |
| chat = event.chat_id |
|
|
| await event.edit(f"π Fetching scheduled messages from `{chat}`...") |
| |
| |
| ids = [msg.id async for msg in event.client.iter_messages(chat, scheduled=True)] |
| if not ids: |
| return await event.edit("β No scheduled messages found.") |
|
|
| deleted = 0 |
| batch_size = 5 |
|
|
| |
| for i in range(0, len(ids), batch_size): |
| try: |
| await event.client(G(chat, ids[i:i+batch_size])) |
| deleted += len(ids[i:i+batch_size]) |
| await asyncio.sleep(0.5) |
| except Exception as e: |
| await event.reply(f"β Error deleting batch {i//batch_size+1}: {e}") |
| await asyncio.sleep(1) |
|
|
| await event.edit(f"β
Deleted `{deleted}` scheduled message(s) from `{chat}`.") |
|
|
| except Exception as e: |
| await event.edit(f"β Error: {e}") |
|
|