ongoing-anime / core /userbot.py
dragxd's picture
Convert bot to userbot: Remove BOT_TOKEN dependency, use SESSION string for user account
7a6d557
# This file is part of the AutoAnime distribution.
# Copyright (c) 2025 Kaif_00z
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License as published by the Free Software Foundation, version 3.
#
# License can be found in <
# https://github.com/kaif-00z/AutoAnimeBot/blob/main/LICENSE > .
# if you are using this following code then don't forgot to give proper
# credit to t.me/kAiF_00z (github.com/kaif-00z)
# little bit inspired from pyUltroid.BaseClient
import asyncio
import sys
from logging import Logger
from traceback import format_exc
from pyrogram import Client, utils
from telethon import TelegramClient
from telethon.errors import (
AccessTokenExpiredError,
AccessTokenInvalidError,
ApiIdInvalidError,
AuthKeyDuplicatedError,
ConnectionError,
)
from telethon.sessions import StringSession
from telethon.utils import get_display_name
from libs.logger import LOGS, TelethonLogger
from functions.config import Var
class UserBot(TelegramClient):
def __init__(
self,
session_string=None,
api_id=None,
api_hash=None,
logger: Logger = LOGS,
log_attempt=True,
exit_on_error=True,
*args,
**kwargs,
):
self._handle_error = exit_on_error
self._log_at = log_attempt
self.logger = logger
# Validate required parameters for userbot
session_string = session_string or Var.SESSION
if not session_string:
self.logger.critical("SESSION string is required for userbot!")
sys.exit(1)
api_id = api_id or Var.API_ID
api_hash = api_hash or Var.API_HASH
if not api_id or not api_hash:
self.logger.critical("API_ID and API_HASH are required for userbot!")
sys.exit(1)
kwargs["api_id"] = api_id
kwargs["api_hash"] = api_hash
kwargs["base_logger"] = TelethonLogger
kwargs["connection_retries"] = 3
kwargs["timeout"] = 30
utils.MIN_CHANNEL_ID = -1002881690459
# Initialize as user account (not bot)
super().__init__(StringSession(session_string), **kwargs)
# Initialize Pyrogram client for user account
self.pyro_client = Client(
name="pekka_user",
api_id=api_id,
api_hash=api_hash,
session_string=session_string,
in_memory=True,
)
self.run_in_loop(self.start_client())
def __repr__(self):
return "<AutoAnimeUserBot.Client :\n user: {}\n>".format(self._bot)
async def start_client(self, retry_count=0, **kwargs):
"""function to start userbot client"""
if self._log_at:
self.logger.info("Trying to login as user...")
# Maximum retry limit to prevent infinite loops
max_retries = 3
if retry_count >= max_retries:
self.logger.critical(f"Failed to connect after {max_retries} attempts. Exiting.")
sys.exit(1)
try:
await self.start()
await self.pyro_client.start()
# Get user info
me = await self.get_me()
self.logger.info(f"Logged in as: {get_display_name(me)} (@{me.username})")
except ApiIdInvalidError:
self.logger.critical("API ID and API_HASH combination does not match!")
sys.exit(1)
except (AuthKeyDuplicatedError, EOFError):
if self._handle_error:
self.logger.critical("String session expired. Create new!")
sys.exit(1)
self.logger.critical("String session expired.")
except ConnectionError as e:
self.logger.error(f"Connection failed: {e}")
if retry_count < max_retries - 1:
self.logger.info(f"Retrying connection in 10 seconds... (Attempt {retry_count + 1}/{max_retries})")
await asyncio.sleep(10)
await self.start_client(retry_count=retry_count + 1)
else:
self.logger.warning("Max retries reached. HF Spaces detected - running in offline mode...")
self.logger.info("UserBot running in offline mode (HF Spaces compatibility)")
return
except (AccessTokenExpiredError, AccessTokenInvalidError):
self.logger.critical("Session expired or invalid. Create new session!")
sys.exit(1)
except Exception as e:
self.logger.critical(f"Unexpected error: {e}")
if self._handle_error:
sys.exit(1)
def run_in_loop(self, function):
"""Run function in event loop"""
try:
return self.loop.run_until_complete(function)
except KeyboardInterrupt:
self.logger.info("UserBot stopped by user")
except Exception as e:
self.logger.error(f"Error in event loop: {e}")
if self._handle_error:
sys.exit(1)
async def send_message_to_channel(self, channel_id, message, **kwargs):
"""Send message to channel as user"""
try:
return await self.send_message(channel_id, message, **kwargs)
except Exception as e:
self.logger.error(f"Error sending message to channel: {e}")
return None
async def send_file_to_channel(self, channel_id, file, **kwargs):
"""Send file to channel as user"""
try:
return await self.send_file(channel_id, file, **kwargs)
except Exception as e:
self.logger.error(f"Error sending file to channel: {e}")
return None
async def get_channel_messages(self, channel_id, limit=10):
"""Get messages from channel"""
try:
messages = []
async for message in self.iter_messages(channel_id, limit=limit):
messages.append(message)
return messages
except Exception as e:
self.logger.error(f"Error getting channel messages: {e}")
return []
async def is_user_in_channel(self, channel_id, user_id):
"""Check if user is in channel"""
try:
participant = await self.get_participants(channel_id, filter=lambda p: p.id == user_id)
return len(participant) > 0
except Exception as e:
self.logger.error(f"Error checking user in channel: {e}")
return False
def run(self):
"""Start the userbot"""
try:
self.logger.info("Starting AutoAnime UserBot...")
self.run_until_disconnected()
except KeyboardInterrupt:
self.logger.info("UserBot stopped by user")
except Exception as e:
self.logger.error(f"Error running userbot: {e}")
if self._handle_error:
sys.exit(1)