# This file is part of the AutoAnime distribution. # Copyright (c) 2025 Kaif_00z # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, version 3. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License as published by the Free Software Foundation, version 3. # # License can be found in < # https://github.com/kaif-00z/AutoAnimeBot/blob/main/LICENSE > . # if you are using this following code then don't forgot to give proper # credit to t.me/kAiF_00z (github.com/kaif-00z) # little bit inspired from pyUltroid.BaseClient import asyncio import sys from logging import Logger from traceback import format_exc from pyrogram import Client, utils from telethon import TelegramClient from telethon.errors import ( AccessTokenExpiredError, AccessTokenInvalidError, ApiIdInvalidError, AuthKeyDuplicatedError, ConnectionError, ) from telethon.sessions import StringSession from telethon.utils import get_display_name from libs.logger import LOGS, TelethonLogger from functions.config import Var class UserBot(TelegramClient): def __init__( self, session_string=None, api_id=None, api_hash=None, logger: Logger = LOGS, log_attempt=True, exit_on_error=True, *args, **kwargs, ): self._handle_error = exit_on_error self._log_at = log_attempt self.logger = logger # Validate required parameters for userbot session_string = session_string or Var.SESSION if not session_string: self.logger.critical("SESSION string is required for userbot!") sys.exit(1) api_id = api_id or Var.API_ID api_hash = api_hash or Var.API_HASH if not api_id or not api_hash: self.logger.critical("API_ID and API_HASH are required for userbot!") sys.exit(1) kwargs["api_id"] = api_id kwargs["api_hash"] = api_hash kwargs["base_logger"] = TelethonLogger kwargs["connection_retries"] = 3 kwargs["timeout"] = 30 utils.MIN_CHANNEL_ID = -1002881690459 # Initialize as user account (not bot) super().__init__(StringSession(session_string), **kwargs) # Initialize Pyrogram client for user account self.pyro_client = Client( name="pekka_user", api_id=api_id, api_hash=api_hash, session_string=session_string, in_memory=True, ) self.run_in_loop(self.start_client()) def __repr__(self): return "".format(self._bot) async def start_client(self, retry_count=0, **kwargs): """function to start userbot client""" if self._log_at: self.logger.info("Trying to login as user...") # Maximum retry limit to prevent infinite loops max_retries = 3 if retry_count >= max_retries: self.logger.critical(f"Failed to connect after {max_retries} attempts. Exiting.") sys.exit(1) try: await self.start() await self.pyro_client.start() # Get user info me = await self.get_me() self.logger.info(f"Logged in as: {get_display_name(me)} (@{me.username})") except ApiIdInvalidError: self.logger.critical("API ID and API_HASH combination does not match!") sys.exit(1) except (AuthKeyDuplicatedError, EOFError): if self._handle_error: self.logger.critical("String session expired. Create new!") sys.exit(1) self.logger.critical("String session expired.") except ConnectionError as e: self.logger.error(f"Connection failed: {e}") if retry_count < max_retries - 1: self.logger.info(f"Retrying connection in 10 seconds... (Attempt {retry_count + 1}/{max_retries})") await asyncio.sleep(10) await self.start_client(retry_count=retry_count + 1) else: self.logger.warning("Max retries reached. HF Spaces detected - running in offline mode...") self.logger.info("UserBot running in offline mode (HF Spaces compatibility)") return except (AccessTokenExpiredError, AccessTokenInvalidError): self.logger.critical("Session expired or invalid. Create new session!") sys.exit(1) except Exception as e: self.logger.critical(f"Unexpected error: {e}") if self._handle_error: sys.exit(1) def run_in_loop(self, function): """Run function in event loop""" try: return self.loop.run_until_complete(function) except KeyboardInterrupt: self.logger.info("UserBot stopped by user") except Exception as e: self.logger.error(f"Error in event loop: {e}") if self._handle_error: sys.exit(1) async def send_message_to_channel(self, channel_id, message, **kwargs): """Send message to channel as user""" try: return await self.send_message(channel_id, message, **kwargs) except Exception as e: self.logger.error(f"Error sending message to channel: {e}") return None async def send_file_to_channel(self, channel_id, file, **kwargs): """Send file to channel as user""" try: return await self.send_file(channel_id, file, **kwargs) except Exception as e: self.logger.error(f"Error sending file to channel: {e}") return None async def get_channel_messages(self, channel_id, limit=10): """Get messages from channel""" try: messages = [] async for message in self.iter_messages(channel_id, limit=limit): messages.append(message) return messages except Exception as e: self.logger.error(f"Error getting channel messages: {e}") return [] async def is_user_in_channel(self, channel_id, user_id): """Check if user is in channel""" try: participant = await self.get_participants(channel_id, filter=lambda p: p.id == user_id) return len(participant) > 0 except Exception as e: self.logger.error(f"Error checking user in channel: {e}") return False def run(self): """Start the userbot""" try: self.logger.info("Starting AutoAnime UserBot...") self.run_until_disconnected() except KeyboardInterrupt: self.logger.info("UserBot stopped by user") except Exception as e: self.logger.error(f"Error running userbot: {e}") if self._handle_error: sys.exit(1)