Spaces:
Paused
Paused
File size: 5,333 Bytes
607505f d3100d7 607505f | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | # Ultroid - UserBot
# Copyright (C) 2021-2025 TeamUltroid
#
# This file is a part of < https://github.com/TeamUltroid/Ultroid/ >
# PLease read the GNU Affero General Public License in
# <https://github.com/TeamUltroid/pyUltroid/blob/main/LICENSE>.
import inspect
import re
from traceback import format_exc
from telethon import Button
from telethon.errors import QueryIdInvalidError
from telethon.events import CallbackQuery, InlineQuery, NewMessage
from telethon.tl.types import InputWebDocument
from .. import LOGS, asst, udB, ultroid_bot
from ..fns.admins import admin_check
from . import append_or_update, owner_and_sudos
OWNER = ultroid_bot.full_name
MSG = f"""
**Ultroid - UserBot**
ββββββββββ
**Owner**: [{OWNER}](tg://user?id={ultroid_bot.uid})
**Support**: @TeamUltroid
ββββββββββ
"""
IN_BTTS = [
[
Button.url(
"Repository",
url="https://github.com/TeamUltroid/Ultroid",
),
Button.url("Support", url="https://t.me/UltroidSupportChat"),
]
]
# decorator for assistant
def asst_cmd(pattern=None, load=None, owner=False, **kwargs):
"""Decorator for assistant's command"""
name = inspect.stack()[1].filename.split("/")[-1].replace(".py", "")
kwargs["forwards"] = False
def ult(func):
if pattern:
kwargs["pattern"] = re.compile(f"^/{pattern}")
async def handler(event):
if owner and event.sender_id not in owner_and_sudos():
return
try:
await func(event)
except Exception as er:
LOGS.exception(er)
asst.add_event_handler(handler, NewMessage(**kwargs))
if load is not None:
append_or_update(load, func, name, kwargs)
return ult
def callback(data=None, from_users=[], admins=False, owner=False, **kwargs):
"""Assistant's callback decorator"""
if "me" in from_users:
from_users.remove("me")
from_users.append(ultroid_bot.uid)
def ultr(func):
async def wrapper(event):
if admins and not await admin_check(event):
return
if from_users and event.sender_id not in from_users:
return await event.answer("Not for You!", alert=True)
if owner and event.sender_id not in owner_and_sudos():
return await event.answer(f"This is {OWNER}'s bot!!")
try:
await func(event)
except Exception as er:
LOGS.exception(er)
# Register on assistant bot if available
try:
is_bot = (hasattr(asst, "_bot") and asst._bot) or (hasattr(asst, "me") and hasattr(asst.me, "bot") and asst.me.bot)
if is_bot:
asst.add_event_handler(wrapper, CallbackQuery(data=data, **kwargs))
except Exception:
pass
# Also register on main userbot so buttons work without assistant bot
ultroid_bot.add_event_handler(wrapper, CallbackQuery(data=data, **kwargs))
return ultr
def in_pattern(pattern=None, owner=False, **kwargs):
"""Assistant's inline decorator."""
def don(func):
async def wrapper(event):
if owner and event.sender_id not in owner_and_sudos():
res = [
await event.builder.article(
title="Ultroid Userbot",
url="https://t.me/TeamUltroid",
description="(c) TeamUltroid",
text=MSG,
thumb=InputWebDocument(
"https://graph.org/file/dde85d441fa051a0d7d1d.jpg",
0,
"image/jpeg",
[],
),
buttons=IN_BTTS,
)
]
return await event.answer(
res,
switch_pm=f"π€: Assistant of {OWNER}",
switch_pm_param="start",
)
try:
await func(event)
except QueryIdInvalidError:
pass
except Exception as er:
err = format_exc()
def error_text():
return f"**#ERROR #INLINE**\n\nQuery: `{asst.me.username} {pattern}`\n\n**Traceback:**\n`{format_exc()}`"
LOGS.exception(er)
try:
await event.answer(
[
await event.builder.article(
title="Unhandled Exception has Occured!",
text=error_text(),
buttons=Button.url(
"Report", "https://t.me/UltroidSupportChat"
),
)
]
)
except QueryIdInvalidError:
LOGS.exception(err)
except Exception as er:
LOGS.exception(er)
await asst.send_message(udB.get_key("LOG_CHANNEL"), error_text())
asst.add_event_handler(wrapper, InlineQuery(pattern=pattern, **kwargs))
return don
|