| |
| |
| |
| |
| |
| |
|
|
| from . import get_help |
|
|
| __doc__ = get_help("help_bot") |
|
|
| import os |
| import sys |
| import time |
| from platform import python_version as pyver |
| from random import choice |
|
|
| from telethon import __version__ |
| from telethon.errors.rpcerrorlist import ( |
| BotMethodInvalidError, |
| ChatSendMediaForbiddenError, |
| ) |
|
|
| from pyUltroid.version import __version__ as UltVer |
|
|
| from . import HOSTED_ON, LOGS |
|
|
| try: |
| from git import Repo |
| except ImportError: |
| LOGS.error("bot: 'gitpython' module not found!") |
| Repo = None |
|
|
| from telethon.utils import resolve_bot_file_id |
|
|
| from . import ( |
| ATRA_COL, |
| LOGS, |
| OWNER_NAME, |
| ULTROID_IMAGES, |
| Button, |
| Carbon, |
| Telegraph, |
| Var, |
| allcmds, |
| asst, |
| bash, |
| call_back, |
| callback, |
| def_logs, |
| eor, |
| get_string, |
| heroku_logs, |
| in_pattern, |
| inline_pic, |
| restart, |
| shutdown, |
| start_time, |
| time_formatter, |
| udB, |
| ultroid_cmd, |
| ultroid_version, |
| updater, |
| ) |
|
|
|
|
| def ULTPIC(): |
| return inline_pic() or choice(ULTROID_IMAGES) |
|
|
|
|
| buttons = [ |
| [ |
| Button.url(get_string("bot_3"), "https://github.com/TeamUltroid/Ultroid"), |
| Button.url(get_string("bot_4"), "t.me/UltroidSupportChat"), |
| ] |
| ] |
|
|
| |
| alive_txt = """ |
| The Ultroid Userbot |
| |
| β Version - {} |
| β Py-Ultroid - {} |
| β Telethon - {} |
| """ |
|
|
| in_alive = "{}\n\nπ <b>Ultroid Version -><b> <code>{}</code>\nπ <b>PyUltroid -></b> <code>{}</code>\nπ <b>Python -></b> <code>{}</code>\nπ <b>Uptime -></b> <code>{}</code>\nπ <b>Branch -></b>[ {} ]\n\nβ’ <b>Join @TeamUltroid</b>" |
|
|
|
|
| @callback("alive") |
| async def alive(event): |
| text = alive_txt.format(ultroid_version, UltVer, __version__) |
| await event.answer(text, alert=True) |
|
|
|
|
| @ultroid_cmd( |
| pattern="alive( (.*)|$)", |
| ) |
| async def lol(ult): |
| match = ult.pattern_match.group(1).strip() |
| inline = None |
| if match in ["inline", "i"]: |
| try: |
| res = await ult.client.inline_query(asst.me.username, "alive") |
| return await res[0].click(ult.chat_id) |
| except BotMethodInvalidError: |
| pass |
| except BaseException as er: |
| LOGS.exception(er) |
| inline = True |
| pic = udB.get_key("ALIVE_PIC") |
| if isinstance(pic, list): |
| pic = choice(pic) |
| uptime = time_formatter((time.time() - start_time) * 1000) |
| header = udB.get_key("ALIVE_TEXT") or get_string("bot_1") |
| y = Repo().active_branch |
| xx = Repo().remotes[0].config_reader.get("url") |
| rep = xx.replace(".git", f"/tree/{y}") |
| kk = f" `[{y}]({rep})` " |
| if inline: |
| kk = f"<a href={rep}>{y}</a>" |
| parse = "html" |
| als = in_alive.format( |
| header, |
| f"{ultroid_version} [{HOSTED_ON}]", |
| UltVer, |
| pyver(), |
| uptime, |
| kk, |
| ) |
|
|
| if _e := udB.get_key("ALIVE_EMOJI"): |
| als = als.replace("π", _e) |
| else: |
| parse = "md" |
| als = (get_string("alive_1")).format( |
| header, |
| OWNER_NAME, |
| f"{ultroid_version} [{HOSTED_ON}]", |
| UltVer, |
| uptime, |
| pyver(), |
| __version__, |
| kk, |
| ) |
|
|
| if a := udB.get_key("ALIVE_EMOJI"): |
| als = als.replace("β΅", a) |
| if pic: |
| try: |
| await ult.reply( |
| als, |
| file=pic, |
| parse_mode=parse, |
| link_preview=False, |
| buttons=buttons if inline else None, |
| ) |
| return await ult.try_delete() |
| except ChatSendMediaForbiddenError: |
| pass |
| except BaseException as er: |
| LOGS.exception(er) |
| try: |
| await ult.reply(file=pic) |
| await ult.reply( |
| als, |
| parse_mode=parse, |
| buttons=buttons if inline else None, |
| link_preview=False, |
| ) |
| return await ult.try_delete() |
| except BaseException as er: |
| LOGS.exception(er) |
| await eor( |
| ult, |
| als, |
| parse_mode=parse, |
| link_preview=False, |
| buttons=buttons if inline else None, |
| ) |
|
|
|
|
| @ultroid_cmd(pattern="ping$", chats=[], type=["official", "assistant"]) |
| async def _(event): |
| start = time.time() |
| x = await event.eor("Pong !") |
| end = round((time.time() - start) * 1000) |
| uptime = time_formatter((time.time() - start_time) * 1000) |
| await x.edit(get_string("ping").format(end, uptime)) |
|
|
|
|
| @ultroid_cmd( |
| pattern="cmds$", |
| ) |
| async def cmds(event): |
| await allcmds(event, Telegraph) |
|
|
|
|
| heroku_api = Var.HEROKU_API |
|
|
|
|
| @ultroid_cmd( |
| pattern="restart$", |
| fullsudo=True, |
| ) |
| async def restartbt(ult): |
| ok = await ult.eor(get_string("bot_5")) |
| call_back() |
| who = "bot" if ult.client._bot else "user" |
| udB.set_key("_RESTART", f"{who}_{ult.chat_id}_{ok.id}") |
| if heroku_api: |
| return await restart(ok) |
| await bash("git pull && pip3 install -r requirements.txt") |
| await bash("pip3 install -r requirements.txt --break-system-packages") |
| if len(sys.argv) > 1: |
| os.execl(sys.executable, sys.executable, "main.py") |
| else: |
| os.execl(sys.executable, sys.executable, "-m", "pyUltroid") |
|
|
|
|
| @ultroid_cmd( |
| pattern="shutdown$", |
| fullsudo=True, |
| ) |
| async def shutdownbot(ult): |
| await shutdown(ult) |
|
|
|
|
| @ultroid_cmd( |
| pattern="logs( (.*)|$)", |
| chats=[], |
| ) |
| async def _(event): |
| opt = event.pattern_match.group(1).strip() |
| file = f"ultroid{sys.argv[-1]}.log" if len(sys.argv) > 1 else "ultroid.log" |
| if opt == "heroku": |
| await heroku_logs(event) |
| elif opt == "carbon" and Carbon: |
| event = await event.eor(get_string("com_1")) |
| with open(file, "r") as f: |
| code = f.read()[-2500:] |
| file = await Carbon( |
| file_name="ultroid-logs", |
| code=code, |
| backgroundColor=choice(ATRA_COL), |
| ) |
| if isinstance(file, dict): |
| await event.eor(f"`{file}`") |
| return |
| await event.reply("**Ultroid Logs.**", file=file) |
| elif opt == "open": |
| with open("ultroid.log", "r") as f: |
| file = f.read()[-4000:] |
| return await event.eor(f"`{file}`") |
| elif ( |
| opt.isdigit() and 5 <= int(opt) <= 100 |
| ): |
| num_lines = int(opt) |
| with open("ultroid.log", "r") as f: |
| lines = f.readlines()[-num_lines:] |
| file = "".join(lines) |
| return await event.eor(f"`{file}`") |
| else: |
| await def_logs(event, file) |
| await event.try_delete() |
|
|
|
|
| @in_pattern("alive", owner=True) |
| async def inline_alive(ult): |
| pic = udB.get_key("ALIVE_PIC") |
| if isinstance(pic, list): |
| pic = choice(pic) |
| uptime = time_formatter((time.time() - start_time) * 1000) |
| header = udB.get_key("ALIVE_TEXT") or get_string("bot_1") |
| y = Repo().active_branch |
| xx = Repo().remotes[0].config_reader.get("url") |
| rep = xx.replace(".git", f"/tree/{y}") |
| kk = f"<a href={rep}>{y}</a>" |
| als = in_alive.format( |
| header, f"{ultroid_version} [{HOSTED_ON}]", UltVer, pyver(), uptime, kk |
| ) |
|
|
| if _e := udB.get_key("ALIVE_EMOJI"): |
| als = als.replace("π", _e) |
| builder = ult.builder |
| if pic: |
| try: |
| if ".jpg" in pic: |
| results = [ |
| await builder.photo( |
| pic, text=als, parse_mode="html", buttons=buttons |
| ) |
| ] |
| else: |
| if _pic := resolve_bot_file_id(pic): |
| pic = _pic |
| buttons.insert( |
| 0, [Button.inline(get_string("bot_2"), data="alive")] |
| ) |
| results = [ |
| await builder.document( |
| pic, |
| title="Inline Alive", |
| description="@TeamUltroid", |
| parse_mode="html", |
| buttons=buttons, |
| ) |
| ] |
| return await ult.answer(results) |
| except BaseException as er: |
| LOGS.exception(er) |
| result = [ |
| await builder.article( |
| "Alive", text=als, parse_mode="html", link_preview=False, buttons=buttons |
| ) |
| ] |
| await ult.answer(result) |
|
|
|
|
| @ultroid_cmd(pattern="update( (.*)|$)") |
| async def _(e): |
| xx = await e.eor(get_string("upd_1")) |
| if e.pattern_match.group(1).strip() and ( |
| "fast" in e.pattern_match.group(1).strip() |
| or "soft" in e.pattern_match.group(1).strip() |
| ): |
| await bash("git pull -f && pip3 install -r requirements.txt") |
| await bash("pip3 install -r requirements.txt --break-system-packages") |
| call_back() |
| await xx.edit(get_string("upd_7")) |
| os.execl(sys.executable, "python3", "-m", "pyUltroid") |
| |
| m = await updater() |
| branch = (Repo.init()).active_branch |
| if m: |
| x = await asst.send_file( |
| udB.get_key("LOG_CHANNEL"), |
| ULTPIC(), |
| caption="β’ **Update Available** β’", |
| force_document=False, |
| buttons=Button.inline("Changelogs", data="changes"), |
| ) |
| Link = x.message_link |
| await xx.edit( |
| f'<strong><a href="{Link}">[ChangeLogs]</a></strong>', |
| parse_mode="html", |
| link_preview=False, |
| ) |
| else: |
| await xx.edit( |
| f'<code>Your BOT is </code><strong>up-to-date</strong><code> with </code><strong><a href="https://github.com/TeamUltroid/Ultroid/tree/{branch}">[{branch}]</a></strong>', |
| parse_mode="html", |
| link_preview=False, |
| ) |
|
|
|
|
| @callback("updtavail", owner=True) |
| async def updava(event): |
| await event.delete() |
| await asst.send_file( |
| udB.get_key("LOG_CHANNEL"), |
| ULTPIC(), |
| caption="β’ **Update Available** β’", |
| force_document=False, |
| buttons=Button.inline("Changelogs", data="changes"), |
| ) |
|
|