Spaces:
Runtime error
Runtime error
| # This file is part of the AutoAnime distribution. | |
| # Copyright (c) 2025 Kaif_00z | |
| # | |
| # This program is free software: you can redistribute it and/or modify | |
| # it under the terms of the GNU General Public License as published by | |
| # the Free Software Foundation, version 3. | |
| # | |
| # This program is distributed in the hope that it will be useful, but | |
| # WITHOUT ANY WARRANTY; without even the implied warranty of | |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
| # General Public License as published by the Free Software Foundation, version 3. | |
| # | |
| # License can be found in < | |
| # https://github.com/kaif-00z/AutoAnimeBot/blob/main/LICENSE > . | |
| # if you are using this following code then don't forgot to give proper | |
| # credit to t.me/kAiF_00z (github.com/kaif-00z) | |
| # little bit inspired from pyUltroid.BaseClient | |
| import asyncio | |
| import sys | |
| from logging import Logger | |
| from traceback import format_exc | |
| from pyrogram import Client, utils | |
| from telethon import TelegramClient | |
| from telethon.errors import ( | |
| AccessTokenExpiredError, | |
| AccessTokenInvalidError, | |
| ApiIdInvalidError, | |
| AuthKeyDuplicatedError, | |
| ConnectionError, | |
| ) | |
| from telethon.sessions import StringSession | |
| from telethon.utils import get_display_name | |
| from libs.logger import LOGS, TelethonLogger | |
| from functions.config import Var | |
| class UserBot(TelegramClient): | |
| def __init__( | |
| self, | |
| session_string=None, | |
| api_id=None, | |
| api_hash=None, | |
| logger: Logger = LOGS, | |
| log_attempt=True, | |
| exit_on_error=True, | |
| *args, | |
| **kwargs, | |
| ): | |
| self._handle_error = exit_on_error | |
| self._log_at = log_attempt | |
| self.logger = logger | |
| # Validate required parameters for userbot | |
| session_string = session_string or Var.SESSION | |
| if not session_string: | |
| self.logger.critical("SESSION string is required for userbot!") | |
| sys.exit(1) | |
| api_id = api_id or Var.API_ID | |
| api_hash = api_hash or Var.API_HASH | |
| if not api_id or not api_hash: | |
| self.logger.critical("API_ID and API_HASH are required for userbot!") | |
| sys.exit(1) | |
| kwargs["api_id"] = api_id | |
| kwargs["api_hash"] = api_hash | |
| kwargs["base_logger"] = TelethonLogger | |
| kwargs["connection_retries"] = 3 | |
| kwargs["timeout"] = 30 | |
| utils.MIN_CHANNEL_ID = -1002881690459 | |
| # Initialize as user account (not bot) | |
| super().__init__(StringSession(session_string), **kwargs) | |
| # Initialize Pyrogram client for user account | |
| self.pyro_client = Client( | |
| name="pekka_user", | |
| api_id=api_id, | |
| api_hash=api_hash, | |
| session_string=session_string, | |
| in_memory=True, | |
| ) | |
| self.run_in_loop(self.start_client()) | |
| def __repr__(self): | |
| return "<AutoAnimeUserBot.Client :\n user: {}\n>".format(self._bot) | |
| async def start_client(self, retry_count=0, **kwargs): | |
| """function to start userbot client""" | |
| if self._log_at: | |
| self.logger.info("Trying to login as user...") | |
| # Maximum retry limit to prevent infinite loops | |
| max_retries = 3 | |
| if retry_count >= max_retries: | |
| self.logger.critical(f"Failed to connect after {max_retries} attempts. Exiting.") | |
| sys.exit(1) | |
| try: | |
| await self.start() | |
| await self.pyro_client.start() | |
| # Get user info | |
| me = await self.get_me() | |
| self.logger.info(f"Logged in as: {get_display_name(me)} (@{me.username})") | |
| except ApiIdInvalidError: | |
| self.logger.critical("API ID and API_HASH combination does not match!") | |
| sys.exit(1) | |
| except (AuthKeyDuplicatedError, EOFError): | |
| if self._handle_error: | |
| self.logger.critical("String session expired. Create new!") | |
| sys.exit(1) | |
| self.logger.critical("String session expired.") | |
| except ConnectionError as e: | |
| self.logger.error(f"Connection failed: {e}") | |
| if retry_count < max_retries - 1: | |
| self.logger.info(f"Retrying connection in 10 seconds... (Attempt {retry_count + 1}/{max_retries})") | |
| await asyncio.sleep(10) | |
| await self.start_client(retry_count=retry_count + 1) | |
| else: | |
| self.logger.warning("Max retries reached. HF Spaces detected - running in offline mode...") | |
| self.logger.info("UserBot running in offline mode (HF Spaces compatibility)") | |
| return | |
| except (AccessTokenExpiredError, AccessTokenInvalidError): | |
| self.logger.critical("Session expired or invalid. Create new session!") | |
| sys.exit(1) | |
| except Exception as e: | |
| self.logger.critical(f"Unexpected error: {e}") | |
| if self._handle_error: | |
| sys.exit(1) | |
| def run_in_loop(self, function): | |
| """Run function in event loop""" | |
| try: | |
| return self.loop.run_until_complete(function) | |
| except KeyboardInterrupt: | |
| self.logger.info("UserBot stopped by user") | |
| except Exception as e: | |
| self.logger.error(f"Error in event loop: {e}") | |
| if self._handle_error: | |
| sys.exit(1) | |
| async def send_message_to_channel(self, channel_id, message, **kwargs): | |
| """Send message to channel as user""" | |
| try: | |
| return await self.send_message(channel_id, message, **kwargs) | |
| except Exception as e: | |
| self.logger.error(f"Error sending message to channel: {e}") | |
| return None | |
| async def send_file_to_channel(self, channel_id, file, **kwargs): | |
| """Send file to channel as user""" | |
| try: | |
| return await self.send_file(channel_id, file, **kwargs) | |
| except Exception as e: | |
| self.logger.error(f"Error sending file to channel: {e}") | |
| return None | |
| async def get_channel_messages(self, channel_id, limit=10): | |
| """Get messages from channel""" | |
| try: | |
| messages = [] | |
| async for message in self.iter_messages(channel_id, limit=limit): | |
| messages.append(message) | |
| return messages | |
| except Exception as e: | |
| self.logger.error(f"Error getting channel messages: {e}") | |
| return [] | |
| async def is_user_in_channel(self, channel_id, user_id): | |
| """Check if user is in channel""" | |
| try: | |
| participant = await self.get_participants(channel_id, filter=lambda p: p.id == user_id) | |
| return len(participant) > 0 | |
| except Exception as e: | |
| self.logger.error(f"Error checking user in channel: {e}") | |
| return False | |
| def run(self): | |
| """Start the userbot""" | |
| try: | |
| self.logger.info("Starting AutoAnime UserBot...") | |
| self.run_until_disconnected() | |
| except KeyboardInterrupt: | |
| self.logger.info("UserBot stopped by user") | |
| except Exception as e: | |
| self.logger.error(f"Error running userbot: {e}") | |
| if self._handle_error: | |
| sys.exit(1) | |