Spaces:
Paused
Paused
| # Ultroid - UserBot | |
| # Copyright (C) 2021-2025 TeamUltroid | |
| # | |
| # This file is a part of < https://github.com/TeamUltroid/Ultroid/ > | |
| # PLease read the GNU Affero General Public License in | |
| # <https://github.com/TeamUltroid/pyUltroid/blob/main/LICENSE>. | |
| import contextlib | |
| import inspect | |
| import sys | |
| import time | |
| from logging import Logger | |
| from telethonpatch import TelegramClient | |
| from telethon import utils as telethon_utils | |
| from telethon.errors import ( | |
| AccessTokenExpiredError, | |
| AccessTokenInvalidError, | |
| ApiIdInvalidError, | |
| AuthKeyDuplicatedError, | |
| ) | |
| from ..configs import Var | |
| from . import * | |
| class UltroidClient(TelegramClient): | |
| def __init__( | |
| self, | |
| session, | |
| api_id=None, | |
| api_hash=None, | |
| bot_token=None, | |
| udB=None, | |
| logger: Logger = LOGS, | |
| log_attempt=True, | |
| exit_on_error=True, | |
| *args, | |
| **kwargs, | |
| ): | |
| self._cache = {} | |
| self._dialogs = [] | |
| self._handle_error = exit_on_error | |
| self._log_at = log_attempt | |
| self.logger = logger | |
| self.udB = udB | |
| kwargs["api_id"] = api_id or Var.API_ID | |
| kwargs["api_hash"] = api_hash or Var.API_HASH | |
| kwargs["base_logger"] = TelethonLogger | |
| super().__init__(session, **kwargs) | |
| self.run_in_loop(self.start_client(bot_token=bot_token)) | |
| self.dc_id = self.session.dc_id | |
| def __repr__(self): | |
| return f"<Ultroid.Client :\n self: {self.full_name}\n bot: {self._bot}\n>" | |
| def __dict__(self): | |
| if self.me: | |
| return self.me.to_dict() | |
| async def start_client(self, **kwargs): | |
| """function to start client""" | |
| if self._log_at: | |
| self.logger.info("Trying to login.") | |
| try: | |
| await self.start(**kwargs) | |
| except ApiIdInvalidError: | |
| self.logger.critical("API ID and API_HASH combination does not match!") | |
| sys.exit() | |
| except (AuthKeyDuplicatedError, EOFError) as er: | |
| if self._handle_error: | |
| self.logger.critical("String session expired. Create new!") | |
| return sys.exit() | |
| self.logger.critical("String session expired.") | |
| except (AccessTokenExpiredError, AccessTokenInvalidError): | |
| # AccessTokenError can only occur for Bot account | |
| # And at Early Process, Its saved in DB. | |
| self.udB.del_key("BOT_TOKEN") | |
| self.logger.critical( | |
| "Bot token is expired or invalid. Create new from @Botfather and add in BOT_TOKEN env variable!" | |
| ) | |
| sys.exit() | |
| # Save some stuff for later use... | |
| self.me = await self.get_me() | |
| if self.me.bot: | |
| me = f"@{self.me.username}" | |
| else: | |
| setattr(self.me, "phone", None) | |
| me = self.full_name | |
| if self._log_at: | |
| self.logger.info(f"Logged in as {me}") | |
| self._bot = await self.is_bot() | |
| async def fast_uploader(self, file, **kwargs): | |
| """Upload files in a faster way""" | |
| import os | |
| from pathlib import Path | |
| start_time = time.time() | |
| path = Path(file) | |
| filename = kwargs.get("filename", path.name) | |
| # Set to True and pass event to show progress bar. | |
| show_progress = kwargs.get("show_progress", False) | |
| if show_progress: | |
| event = kwargs["event"] | |
| # Whether to use cached file for uploading or not | |
| use_cache = kwargs.get("use_cache", True) | |
| # Delete original file after uploading | |
| to_delete = kwargs.get("to_delete", False) | |
| message = kwargs.get("message", f"Uploading {filename}...") | |
| by_bot = self._bot | |
| size = os.path.getsize(file) | |
| # Don't show progress bar when file size is less than 5MB. | |
| if size < 5 * 2 ** 20: | |
| show_progress = False | |
| if use_cache and self._cache and self._cache.get("upload_cache"): | |
| for files in self._cache["upload_cache"]: | |
| if ( | |
| files["size"] == size | |
| and files["path"] == path | |
| and files["name"] == filename | |
| and files["by_bot"] == by_bot | |
| ): | |
| if to_delete: | |
| with contextlib.suppress(FileNotFoundError): | |
| os.remove(file) | |
| return files["raw_file"], time.time() - start_time | |
| from pyUltroid.fns.FastTelethon import upload_file | |
| from pyUltroid.fns.helper import progress | |
| raw_file = None | |
| while not raw_file: | |
| with open(file, "rb") as f: | |
| raw_file = await upload_file( | |
| client=self, | |
| file=f, | |
| filename=filename, | |
| progress_callback=( | |
| lambda completed, total: self.loop.create_task( | |
| progress(completed, total, event, start_time, message) | |
| ) | |
| ) | |
| if show_progress | |
| else None, | |
| ) | |
| cache = { | |
| "by_bot": by_bot, | |
| "size": size, | |
| "path": path, | |
| "name": filename, | |
| "raw_file": raw_file, | |
| } | |
| if self._cache.get("upload_cache"): | |
| self._cache["upload_cache"].append(cache) | |
| else: | |
| self._cache.update({"upload_cache": [cache]}) | |
| if to_delete: | |
| with contextlib.suppress(FileNotFoundError): | |
| os.remove(file) | |
| return raw_file, time.time() - start_time | |
| async def fast_downloader(self, file, **kwargs): | |
| """Download files in a faster way""" | |
| # Set to True and pass event to show progress bar. | |
| show_progress = kwargs.get("show_progress", False) | |
| filename = kwargs.get("filename", "") | |
| if show_progress: | |
| event = kwargs["event"] | |
| # Don't show progress bar when file size is less than 10MB. | |
| if file.size < 10 * 2 ** 20: | |
| show_progress = False | |
| import mimetypes | |
| from telethon.tl.types import DocumentAttributeFilename | |
| from pyUltroid.fns.FastTelethon import download_file | |
| from pyUltroid.fns.helper import progress | |
| start_time = time.time() | |
| # Auto-generate Filename | |
| if not filename: | |
| try: | |
| if isinstance(file.attributes[-1], DocumentAttributeFilename): | |
| filename = file.attributes[-1].file_name | |
| except IndexError: | |
| mimetype = file.mime_type | |
| filename = ( | |
| mimetype.split("/")[0] | |
| + "-" | |
| + str(round(start_time)) | |
| + mimetypes.guess_extension(mimetype) | |
| ) | |
| message = kwargs.get("message", f"Downloading {filename}...") | |
| raw_file = None | |
| while not raw_file: | |
| with open(filename, "wb") as f: | |
| raw_file = await download_file( | |
| client=self, | |
| location=file, | |
| out=f, | |
| progress_callback=( | |
| lambda completed, total: self.loop.create_task( | |
| progress(completed, total, event, start_time, message) | |
| ) | |
| ) | |
| if show_progress | |
| else None, | |
| ) | |
| return raw_file, time.time() - start_time | |
| def run_in_loop(self, function): | |
| """run inside asyncio loop""" | |
| return self.loop.run_until_complete(function) | |
| def run(self): | |
| """run asyncio loop""" | |
| self.run_until_disconnected() | |
| def add_handler(self, func, *args, **kwargs): | |
| """Add new event handler, ignoring if exists""" | |
| if func in [_[0] for _ in self.list_event_handlers()]: | |
| return | |
| self.add_event_handler(func, *args, **kwargs) | |
| def utils(self): | |
| return telethon_utils | |
| def full_name(self): | |
| """full name of Client""" | |
| return self.utils.get_display_name(self.me) | |
| def uid(self): | |
| """Client's user id""" | |
| return self.me.id | |
| def to_dict(self): | |
| return dict(inspect.getmembers(self)) | |
| async def parse_id(self, text): | |
| with contextlib.suppress(ValueError): | |
| text = int(text) | |
| return await self.get_peer_id(text) | |