# # Copyright (C) 2021-2022 by TeamYukki@Github, < https://github.com/TeamYukki >. # # This file is part of < https://github.com/TeamYukki/YukkiMusicBot > project, # and is released under the "GNU v3.0 License Agreement". # Please see < https://github.com/TeamYukki/YukkiMusicBot/blob/master/LICENSE > # # All rights reserved. import asyncio from pyrogram import filters from pyrogram.types import (InlineKeyboardButton, InlineKeyboardMarkup, Message) from youtubesearchpython.__future__ import VideosSearch import config from config import BANNED_USERS from config.config import OWNER_ID from strings import get_command, get_string from YukkiMusic import Telegram, YouTube, app from YukkiMusic.misc import SUDOERS from YukkiMusic.plugins.play.playlist import del_plist_msg from YukkiMusic.plugins.sudo.sudoers import sudoers_list from YukkiMusic.utils.database import (add_served_chat, add_served_user, blacklisted_chats, get_assistant, get_lang, get_userss, is_on_off, is_served_private_chat) from YukkiMusic.utils.decorators.language import LanguageStart from YukkiMusic.utils.inline import (help_pannel, private_panel, start_pannel) loop = asyncio.get_running_loop() @app.on_message( filters.command(get_command("START_COMMAND")) & filters.private & ~BANNED_USERS ) @LanguageStart async def start_comm(client, message: Message, _): await add_served_user(message.from_user.id) if len(message.text.split()) > 1: name = message.text.split(None, 1)[1] if name[0:4] == "help": keyboard = help_pannel(_) return await message.reply_text( _["help_1"], reply_markup=keyboard ) if name[0:4] == "song": return await message.reply_text(_["song_2"]) if name[0:3] == "sta": m = await message.reply_text( "🔎 Fetching your personal stats.!" ) stats = await get_userss(message.from_user.id) tot = len(stats) if not stats: await asyncio.sleep(1) return await m.edit(_["ustats_1"]) def get_stats(): msg = "" limit = 0 results = {} for i in stats: top_list = stats[i]["spot"] results[str(i)] = top_list list_arranged = dict( sorted( results.items(), key=lambda item: item[1], reverse=True, ) ) if not results: return m.edit(_["ustats_1"]) tota = 0 videoid = None for vidid, count in list_arranged.items(): tota += count if limit == 10: continue if limit == 0: videoid = vidid limit += 1 details = stats.get(vidid) title = (details["title"][:35]).title() if vidid == "telegram": msg += f"🔗[Telegram Files and Audios](https://t.me/telegram) ** played {count} times**\n\n" else: msg += f"🔗 [{title}](https://www.youtube.com/watch?v={vidid}) ** played {count} times**\n\n" msg = _["ustats_2"].format(tot, tota, limit) + msg return videoid, msg try: videoid, msg = await loop.run_in_executor( None, get_stats ) except Exception as e: print(e) return thumbnail = await YouTube.thumbnail(videoid, True) await m.delete() await message.reply_photo(photo=thumbnail, caption=msg) return if name[0:3] == "sud": await sudoers_list(client=client, message=message, _=_) if await is_on_off(config.LOG): sender_id = message.from_user.id sender_name = message.from_user.first_name return await app.send_message( config.LOG_GROUP_ID, f"{message.from_user.mention} has just started bot to check SUDOLIST\n\n**USER ID:** {sender_id}\n**USER NAME:** {sender_name}", ) return if name[0:3] == "lyr": query = (str(name)).replace("lyrics_", "", 1) lyrical = config.lyrical lyrics = lyrical.get(query) if lyrics: return await Telegram.send_split_text(message, lyrics) else: return await message.reply_text( "Failed to get lyrics." ) if name[0:3] == "del": await del_plist_msg(client=client, message=message, _=_) if name[0:3] == "inf": m = await message.reply_text("🔎 Fetching Info!") query = (str(name)).replace("info_", "", 1) query = f"https://www.youtube.com/watch?v={query}" results = VideosSearch(query, limit=1) for result in (await results.next())["result"]: title = result["title"] duration = result["duration"] views = result["viewCount"]["short"] thumbnail = result["thumbnails"][0]["url"].split("?")[ 0 ] channellink = result["channel"]["link"] channel = result["channel"]["name"] link = result["link"] published = result["publishedTime"] searched_text = f""" 🔍__**Video Track Information**__ ❇️**Title:** {title} ⏳**Duration:** {duration} Mins 👀**Views:** `{views}` ⏰**Published Time:** {published} 🎥**Channel Name:** {channel} 📎**Channel Link:** [Visit From Here]({channellink}) 🔗**Video Link:** [Link]({link}) ⚡️ __Searched Powered By {config.MUSIC_BOT_NAME}__""" key = InlineKeyboardMarkup( [ [ InlineKeyboardButton( text="🎥 Watch ", url=f"{link}" ), InlineKeyboardButton( text="🔄 Close", callback_data="close" ), ], ] ) await m.delete() await app.send_photo( message.chat.id, photo=thumbnail, caption=searched_text, reply_markup=key, ) if await is_on_off(config.LOG): sender_id = message.from_user.id sender_name = message.from_user.first_name return await app.send_message( config.LOG_GROUP_ID, f"{message.from_user.mention} has just started bot to check VIDEO INFORMATION\n\n**USER ID:** {sender_id}\n**USER NAME:** {sender_name}", ) else: try: await app.resolve_peer(OWNER_ID[0]) OWNER = OWNER_ID[0] except: OWNER = None out = private_panel(_, app.username, OWNER) if config.START_IMG_URL: try: await message.reply_photo( photo=config.START_IMG_URL, caption=_["start_2"].format( config.MUSIC_BOT_NAME ), reply_markup=InlineKeyboardMarkup(out), ) except: await message.reply_text( _["start_2"].format(config.MUSIC_BOT_NAME), reply_markup=InlineKeyboardMarkup(out), ) else: await message.reply_text( _["start_2"].format(config.MUSIC_BOT_NAME), reply_markup=InlineKeyboardMarkup(out), ) if await is_on_off(config.LOG): sender_id = message.from_user.id sender_name = message.from_user.first_name return await app.send_message( config.LOG_GROUP_ID, f"{message.from_user.mention} has just started Bot.\n\n**USER ID:** {sender_id}\n**USER NAME:** {sender_name}", ) @app.on_message( filters.command(get_command("START_COMMAND")) & filters.group & ~BANNED_USERS ) @LanguageStart async def testbot(client, message: Message, _): out = start_pannel(_) return await message.reply_text( _["start_1"].format( message.chat.title, config.MUSIC_BOT_NAME ), reply_markup=InlineKeyboardMarkup(out), ) welcome_group = 2 @app.on_message(filters.new_chat_members, group=welcome_group) async def welcome(client, message: Message): chat_id = message.chat.id if config.PRIVATE_BOT_MODE == str(True): if not await is_served_private_chat(message.chat.id): await message.reply_text( "**Private Music Bot**\n\nOnly for authorized chats from the owner. Ask my owner to allow your chat first." ) return await app.leave_chat(message.chat.id) else: await add_served_chat(chat_id) for member in message.new_chat_members: try: language = await get_lang(message.chat.id) _ = get_string(language) if member.id == app.id: chat_type = message.chat.type if chat_type != "SUPERGROUP": await message.reply_text(_["start_6"]) return await app.leave_chat(message.chat.id) if chat_id in await blacklisted_chats(): await message.reply_text( _["start_7"].format( f"https://t.me/{app.username}?start=sudolist" ) ) return await app.leave_chat(chat_id) userbot = await get_assistant(message.chat.id) out = start_pannel(_) await message.reply_text( _["start_3"].format( config.MUSIC_BOT_NAME, userbot.username, userbot.id, ), reply_markup=InlineKeyboardMarkup(out), ) if member.id in config.OWNER_ID: return await message.reply_text( _["start_4"].format( config.MUSIC_BOT_NAME, member.mention ) ) if member.id in SUDOERS: return await message.reply_text( _["start_5"].format( config.MUSIC_BOT_NAME, member.mention ) ) return except: return