Spaces:
Paused
Paused
| # Ultroid - UserBot | |
| # Copyright (C) 2021-2025 TeamUltroid | |
| # | |
| # This file is a part of < https://github.com/TeamUltroid/Ultroid/ > | |
| # PLease read the GNU Affero General Public License in | |
| # <https://github.com/TeamUltroid/pyUltroid/blob/main/LICENSE>. | |
| import asyncio | |
| import os | |
| import random | |
| import shutil | |
| import time | |
| from random import randint | |
| from ..configs import Var | |
| try: | |
| from pytz import timezone | |
| except ImportError: | |
| timezone = None | |
| from telethon.errors import ( | |
| ChannelsTooMuchError, | |
| ChatAdminRequiredError, | |
| MessageIdInvalidError, | |
| MessageNotModifiedError, | |
| UserNotParticipantError, | |
| ) | |
| from telethon.tl.custom import Button | |
| from telethon.tl.functions.channels import ( | |
| CreateChannelRequest, | |
| EditAdminRequest, | |
| EditPhotoRequest, | |
| InviteToChannelRequest, | |
| ) | |
| from telethon.tl.functions.channels import JoinChannelRequest | |
| from telethon.tl.functions.contacts import UnblockRequest | |
| from telethon.tl.types import ( | |
| ChatAdminRights, | |
| ChatPhotoEmpty, | |
| InputChatUploadedPhoto, | |
| InputMessagesFilterDocument, | |
| ) | |
| from telethon.utils import get_peer_id | |
| from decouple import config, RepositoryEnv | |
| from .. import LOGS, ULTConfig | |
| from ..fns.helper import download_file, inline_mention, updater | |
| db_url = 0 | |
| async def autoupdate_local_database(): | |
| from .. import Var, asst, udB, ultroid_bot | |
| global db_url | |
| db_url = ( | |
| udB.get_key("TGDB_URL") or Var.TGDB_URL or ultroid_bot._cache.get("TGDB_URL") | |
| ) | |
| if db_url: | |
| _split = db_url.split("/") | |
| _channel = _split[-2] | |
| _id = _split[-1] | |
| try: | |
| await asst.edit_message( | |
| int(_channel) if _channel.isdigit() else _channel, | |
| message=_id, | |
| file="database.json", | |
| text="**Do not delete this file.**", | |
| ) | |
| except MessageNotModifiedError: | |
| return | |
| except MessageIdInvalidError: | |
| pass | |
| try: | |
| LOG_CHANNEL = ( | |
| udB.get_key("LOG_CHANNEL") | |
| or Var.LOG_CHANNEL | |
| or asst._cache.get("LOG_CHANNEL") | |
| or "me" | |
| ) | |
| msg = await asst.send_message( | |
| LOG_CHANNEL, "**Do not delete this file.**", file="database.json" | |
| ) | |
| asst._cache["TGDB_URL"] = msg.message_link | |
| udB.set_key("TGDB_URL", msg.message_link) | |
| except Exception as ex: | |
| LOGS.error(f"Error on autoupdate_local_database: {ex}") | |
| def update_envs(): | |
| """Update Var. attributes to udB""" | |
| from .. import udB | |
| _envs = [*list(os.environ)] | |
| if ".env" in os.listdir("."): | |
| [_envs.append(_) for _ in list(RepositoryEnv(config._find_file(".")).data)] | |
| for envs in _envs: | |
| if ( | |
| envs in ["LOG_CHANNEL", "BOT_TOKEN", "BOTMODE", "DUAL_MODE", "language"] | |
| or envs in udB.keys() | |
| ): | |
| if _value := os.environ.get(envs): | |
| udB.set_key(envs, _value) | |
| else: | |
| udB.set_key(envs, config.config.get(envs)) | |
| async def startup_stuff(): | |
| from .. import udB | |
| x = ["resources/auth", "resources/downloads"] | |
| for x in x: | |
| if not os.path.isdir(x): | |
| os.mkdir(x) | |
| CT = udB.get_key("CUSTOM_THUMBNAIL") | |
| if CT: | |
| path = "resources/extras/thumbnail.jpg" | |
| ULTConfig.thumb = path | |
| try: | |
| await download_file(CT, path) | |
| except Exception as er: | |
| LOGS.exception(er) | |
| elif CT is False: | |
| ULTConfig.thumb = None | |
| GT = udB.get_key("GDRIVE_AUTH_TOKEN") | |
| if GT: | |
| with open("resources/auth/gdrive_creds.json", "w") as t_file: | |
| t_file.write(GT) | |
| if udB.get_key("AUTH_TOKEN"): | |
| udB.del_key("AUTH_TOKEN") | |
| MM = udB.get_key("MEGA_MAIL") | |
| MP = udB.get_key("MEGA_PASS") | |
| if MM and MP: | |
| with open(".megarc", "w") as mega: | |
| mega.write(f"[Login]\nUsername = {MM}\nPassword = {MP}") | |
| TZ = udB.get_key("TIMEZONE") | |
| if TZ and timezone: | |
| try: | |
| timezone(TZ) | |
| os.environ["TZ"] = TZ | |
| time.tzset() | |
| except AttributeError as er: | |
| LOGS.debug(er) | |
| except BaseException: | |
| LOGS.critical( | |
| "Incorrect Timezone ,\nCheck Available Timezone From Here https://graph.org/Ultroid-06-18-2\nSo Time is Default UTC" | |
| ) | |
| os.environ["TZ"] = "UTC" | |
| time.tzset() | |
| async def autobot(): | |
| from .. import udB, ultroid_bot | |
| if udB.get_key("BOT_TOKEN"): | |
| return | |
| await ultroid_bot.start() | |
| LOGS.info("MAKING A TELEGRAM BOT FOR YOU AT @BotFather, Kindly Wait") | |
| who = ultroid_bot.me | |
| name = who.first_name + "'s Bot" | |
| if who.username: | |
| username = who.username + "_bot" | |
| else: | |
| username = "ultroid_" + (str(who.id))[5:] + "_bot" | |
| bf = "@BotFather" | |
| await ultroid_bot(UnblockRequest(bf)) | |
| await ultroid_bot.send_message(bf, "/cancel") | |
| await asyncio.sleep(1) | |
| await ultroid_bot.send_message(bf, "/newbot") | |
| await asyncio.sleep(1) | |
| isdone = (await ultroid_bot.get_messages(bf, limit=1))[0].text | |
| if isdone.startswith("That I cannot do.") or "20 bots" in isdone: | |
| LOGS.critical( | |
| "Please make a Bot from @BotFather and add it's token in BOT_TOKEN, as an env var and restart me." | |
| ) | |
| import sys | |
| sys.exit(1) | |
| await ultroid_bot.send_message(bf, name) | |
| await asyncio.sleep(1) | |
| isdone = (await ultroid_bot.get_messages(bf, limit=1))[0].text | |
| if not isdone.startswith("Good."): | |
| await ultroid_bot.send_message(bf, "My Assistant Bot") | |
| await asyncio.sleep(1) | |
| isdone = (await ultroid_bot.get_messages(bf, limit=1))[0].text | |
| if not isdone.startswith("Good."): | |
| LOGS.critical( | |
| "Please make a Bot from @BotFather and add it's token in BOT_TOKEN, as an env var and restart me." | |
| ) | |
| import sys | |
| sys.exit(1) | |
| await ultroid_bot.send_message(bf, username) | |
| await asyncio.sleep(1) | |
| isdone = (await ultroid_bot.get_messages(bf, limit=1))[0].text | |
| await ultroid_bot.send_read_acknowledge("botfather") | |
| if isdone.startswith("Sorry,"): | |
| ran = randint(1, 100) | |
| username = "ultroid_" + (str(who.id))[6:] + str(ran) + "_bot" | |
| await ultroid_bot.send_message(bf, username) | |
| await asyncio.sleep(1) | |
| isdone = (await ultroid_bot.get_messages(bf, limit=1))[0].text | |
| if isdone.startswith("Done!"): | |
| token = isdone.split("`")[1] | |
| udB.set_key("BOT_TOKEN", token) | |
| await enable_inline(ultroid_bot, username) | |
| LOGS.info( | |
| f"Done. Successfully created @{username} to be used as your assistant bot!" | |
| ) | |
| else: | |
| LOGS.info( | |
| "Please Delete Some Of your Telegram bots at @Botfather or Set Var BOT_TOKEN with token of a bot" | |
| ) | |
| import sys | |
| sys.exit(1) | |
| async def autopilot(): | |
| from .. import asst, udB, ultroid_bot | |
| channel = udB.get_key("LOG_CHANNEL") | |
| new_channel = None | |
| if channel: | |
| try: | |
| chat = await ultroid_bot.get_entity(channel) | |
| except BaseException as err: | |
| LOGS.exception(err) | |
| udB.del_key("LOG_CHANNEL") | |
| channel = None | |
| if not channel: | |
| async def _save(exc): | |
| udB._cache["LOG_CHANNEL"] = ultroid_bot.me.id | |
| await asst.send_message( | |
| ultroid_bot.me.id, f"Failed to Create Log Channel due to {exc}.." | |
| ) | |
| if ultroid_bot._bot: | |
| msg_ = "'LOG_CHANNEL' not found! Add it in order to use 'BOTMODE'" | |
| LOGS.error(msg_) | |
| return await _save(msg_) | |
| LOGS.info("Creating a Log Channel for You!") | |
| try: | |
| r = await ultroid_bot( | |
| CreateChannelRequest( | |
| title="My Ultroid Logs", | |
| about="My Ultroid Log Group\n\n Join @TeamUltroid", | |
| megagroup=True, | |
| ), | |
| ) | |
| except ChannelsTooMuchError as er: | |
| LOGS.critical( | |
| "You Are in Too Many Channels & Groups , Leave some And Restart The Bot" | |
| ) | |
| return await _save(str(er)) | |
| except BaseException as er: | |
| LOGS.exception(er) | |
| LOGS.info( | |
| "Something Went Wrong , Create A Group and set its id on config var LOG_CHANNEL." | |
| ) | |
| return await _save(str(er)) | |
| new_channel = True | |
| chat = r.chats[0] | |
| channel = get_peer_id(chat) | |
| udB.set_key("LOG_CHANNEL", channel) | |
| assistant = True | |
| try: | |
| await ultroid_bot.get_permissions(int(channel), asst.me.username) | |
| except UserNotParticipantError: | |
| try: | |
| await ultroid_bot(InviteToChannelRequest(int(channel), [asst.me.username])) | |
| except BaseException as er: | |
| LOGS.info("Error while Adding Assistant to Log Channel") | |
| LOGS.exception(er) | |
| assistant = False | |
| except BaseException as er: | |
| assistant = False | |
| LOGS.exception(er) | |
| if assistant and new_channel: | |
| try: | |
| achat = await asst.get_entity(int(channel)) | |
| except BaseException as er: | |
| achat = None | |
| LOGS.info("Error while getting Log channel from Assistant") | |
| LOGS.exception(er) | |
| if achat and not achat.admin_rights: | |
| rights = ChatAdminRights( | |
| add_admins=True, | |
| invite_users=True, | |
| change_info=True, | |
| ban_users=True, | |
| delete_messages=True, | |
| pin_messages=True, | |
| anonymous=False, | |
| manage_call=True, | |
| ) | |
| try: | |
| await ultroid_bot( | |
| EditAdminRequest( | |
| int(channel), asst.me.username, rights, "Assistant" | |
| ) | |
| ) | |
| except ChatAdminRequiredError: | |
| LOGS.info( | |
| "Failed to promote 'Assistant Bot' in 'Log Channel' due to 'Admin Privileges'" | |
| ) | |
| except BaseException as er: | |
| LOGS.info("Error while promoting assistant in Log Channel..") | |
| LOGS.exception(er) | |
| if isinstance(chat.photo, ChatPhotoEmpty): | |
| photo_path = "channelphoto.jpg" | |
| if not os.path.exists(photo_path): | |
| LOGS.info(f"{photo_path} not found, skipping channel photo upload.") | |
| else: | |
| photo, _ = await download_file( | |
| "https://graph.org/file/27c6812becf6f376cbb10.jpg", photo_path | |
| ) | |
| ll = await ultroid_bot.upload_file(photo) | |
| try: | |
| await ultroid_bot( | |
| EditPhotoRequest(int(channel), InputChatUploadedPhoto(ll)) | |
| ) | |
| except BaseException as er: | |
| LOGS.exception(er) | |
| os.remove(photo) | |
| # customize assistant | |
| async def customize(): | |
| from .. import asst, udB, ultroid_bot | |
| rem = None | |
| try: | |
| chat_id = udB.get_key("LOG_CHANNEL") | |
| if asst.me.photo: | |
| return | |
| LOGS.info("Customising Ur Assistant Bot in @BOTFATHER") | |
| UL = f"@{asst.me.username}" | |
| if not ultroid_bot.me.username: | |
| sir = ultroid_bot.me.first_name | |
| else: | |
| sir = f"@{ultroid_bot.me.username}" | |
| file = random.choice( | |
| [ | |
| "https://graph.org/file/92cd6dbd34b0d1d73a0da.jpg", | |
| "https://graph.org/file/a97973ee0425b523cdc28.jpg", | |
| "resources/extras/ultroid_assistant.jpg", | |
| ] | |
| ) | |
| if not os.path.exists(file): | |
| file, _ = await download_file(file, "profile.jpg") | |
| rem = True | |
| # Check if the file exists and is a valid file before sending | |
| if not (os.path.exists(file) and os.path.isfile(file)): | |
| LOGS.warning(f"Profile photo file '{file}' does not exist or is not a valid file. Skipping customization.") | |
| return | |
| msg = await asst.send_message( | |
| chat_id, "**Auto Customisation** Started on @Botfather" | |
| ) | |
| await asyncio.sleep(1) | |
| await ultroid_bot.send_message("botfather", "/cancel") | |
| await asyncio.sleep(1) | |
| await ultroid_bot.send_message("botfather", "/setuserpic") | |
| await asyncio.sleep(1) | |
| isdone = (await ultroid_bot.get_messages("botfather", limit=1))[0].text | |
| if isdone.startswith("Invalid bot"): | |
| LOGS.info("Error while trying to customise assistant, skipping...") | |
| return | |
| await ultroid_bot.send_message("botfather", UL) | |
| await asyncio.sleep(1) | |
| await ultroid_bot.send_file("botfather", file) | |
| await asyncio.sleep(2) | |
| await ultroid_bot.send_message("botfather", "/setabouttext") | |
| await asyncio.sleep(1) | |
| await ultroid_bot.send_message("botfather", UL) | |
| await asyncio.sleep(1) | |
| await ultroid_bot.send_message( | |
| "botfather", f"β¨ Hello β¨!! I'm Assistant Bot of {sir}" | |
| ) | |
| await asyncio.sleep(2) | |
| await ultroid_bot.send_message("botfather", "/setdescription") | |
| await asyncio.sleep(1) | |
| await ultroid_bot.send_message("botfather", UL) | |
| await asyncio.sleep(1) | |
| await ultroid_bot.send_message( | |
| "botfather", | |
| f"β¨ Powerful Ultroid Assistant Bot β¨\nβ¨ Master ~ {sir} β¨\n\nβ¨ Powered By ~ @TeamUltroid β¨", | |
| ) | |
| await asyncio.sleep(2) | |
| await msg.edit("Completed **Auto Customisation** at @BotFather.") | |
| if rem: | |
| os.remove(file) | |
| LOGS.info("Customisation Done") | |
| except Exception as e: | |
| LOGS.exception(e) | |
| async def plug(plugin_channels): | |
| from .. import ultroid_bot | |
| from .utils import load_addons | |
| if ultroid_bot._bot: | |
| LOGS.info("Plugin Channels can't be used in 'BOTMODE'") | |
| return | |
| if os.path.exists("addons") and not os.path.exists("addons/.git"): | |
| shutil.rmtree("addons") | |
| if not os.path.exists("addons"): | |
| os.mkdir("addons") | |
| if not os.path.exists("addons/__init__.py"): | |
| with open("addons/__init__.py", "w") as f: | |
| f.write("from plugins import *\n\nbot = ultroid_bot") | |
| LOGS.info("β’ Loading Plugins from Plugin Channel(s) β’") | |
| for chat in plugin_channels: | |
| LOGS.info(f"{'β’'*4} {chat}") | |
| try: | |
| async for x in ultroid_bot.iter_messages( | |
| chat, search=".py", filter=InputMessagesFilterDocument, wait_time=10 | |
| ): | |
| plugin = "addons/" + x.file.name.replace("_", "-").replace("|", "-") | |
| if not os.path.exists(plugin): | |
| await asyncio.sleep(0.6) | |
| if x.text == "#IGNORE": | |
| continue | |
| plugin = await x.download_media(plugin) | |
| try: | |
| load_addons(plugin) | |
| except Exception as e: | |
| LOGS.info(f"Ultroid - PLUGIN_CHANNEL - ERROR - {plugin}") | |
| LOGS.exception(e) | |
| os.remove(plugin) | |
| except Exception as er: | |
| LOGS.exception(er) | |
| async def ready(): | |
| from .. import asst, udB, ultroid_bot | |
| chat_id = udB.get_key("LOG_CHANNEL") | |
| spam_sent = None | |
| if not udB.get_key("INIT_DEPLOY"): # Detailed Message at Initial Deploy | |
| MSG = """π **Thanks for Deploying Ultroid Userbot!** | |
| β’ Here, are the Some Basic stuff from, where you can Know, about its Usage.""" | |
| PHOTO = "https://graph.org/file/54a917cc9dbb94733ea5f.jpg" | |
| BTTS = Button.inline("β’ Click to Start β’", "initft_2") | |
| udB.set_key("INIT_DEPLOY", "Done") | |
| else: | |
| MSG = f"**Ultroid has been deployed!**\nββββββββββ\n**UserMode**: {inline_mention(ultroid_bot.me)}\n**Assistant**: @{asst.me.username}\nββββββββββ\n**Support**: @TeamUltroid\nββββββββββ" | |
| BTTS, PHOTO = None, None | |
| prev_spam = udB.get_key("LAST_UPDATE_LOG_SPAM") | |
| if prev_spam: | |
| try: | |
| await ultroid_bot.delete_messages(chat_id, int(prev_spam)) | |
| except Exception as E: | |
| LOGS.info("Error while Deleting Previous Update Message :" + str(E)) | |
| if await updater(): | |
| BTTS = Button.inline("Update Available", "updtavail") | |
| try: | |
| spam_sent = await asst.send_message(chat_id, MSG, file=PHOTO, buttons=BTTS) | |
| except ValueError as e: | |
| try: | |
| await (await ultroid_bot.send_message(chat_id, str(e))).delete() | |
| spam_sent = await asst.send_message(chat_id, MSG, file=PHOTO, buttons=BTTS) | |
| except Exception as g: | |
| LOGS.info(g) | |
| except Exception as el: | |
| LOGS.info(el) | |
| try: | |
| spam_sent = await ultroid_bot.send_message(chat_id, MSG) | |
| except Exception as ef: | |
| LOGS.exception(ef) | |
| if spam_sent and not spam_sent.media: | |
| udB.set_key("LAST_UPDATE_LOG_SPAM", spam_sent.id) | |
| try: | |
| await ultroid_bot(JoinChannelRequest("TheUltroid")) | |
| except Exception as er: | |
| LOGS.exception(er) | |
| async def WasItRestart(udb): | |
| key = udb.get_key("_RESTART") | |
| if not key: | |
| return | |
| from .. import asst, ultroid_bot | |
| try: | |
| data = key.split("_") | |
| who = asst if data[0] == "bot" else ultroid_bot | |
| await who.edit_message( | |
| int(data[1]), int(data[2]), "__Restarted Successfully.__" | |
| ) | |
| except Exception as er: | |
| LOGS.exception(er) | |
| udb.del_key("_RESTART") | |
| def _version_changes(udb): | |
| for _ in [ | |
| "BOT_USERS", | |
| "BOT_BLS", | |
| "VC_SUDOS", | |
| "SUDOS", | |
| "CLEANCHAT", | |
| "LOGUSERS", | |
| "PLUGIN_CHANNEL", | |
| "CH_SOURCE", | |
| "CH_DESTINATION", | |
| "BROADCAST", | |
| ]: | |
| key = udb.get_key(_) | |
| if key and str(key)[0] != "[": | |
| key = udb.get(_) | |
| new_ = [ | |
| int(z) if z.isdigit() or (z.startswith("-") and z[1:].isdigit()) else z | |
| for z in key.split() | |
| ] | |
| udb.set_key(_, new_) | |
| async def enable_inline(ultroid_bot, username): | |
| bf = "BotFather" | |
| await ultroid_bot.send_message(bf, "/setinline") | |
| await asyncio.sleep(1) | |
| await ultroid_bot.send_message(bf, f"@{username}") | |
| await asyncio.sleep(1) | |
| await ultroid_bot.send_message(bf, "Search") | |
| await ultroid_bot.send_read_acknowledge(bf) | |