# # Copyright (C) 2021-2022 by TeamYukki@Github, < https://github.com/TeamYukki >. # # This file is part of < https://github.com/TeamYukki/YukkiMusicBot > project, # and is released under the "GNU v3.0 License Agreement". # Please see < https://github.com/TeamYukki/YukkiMusicBot/blob/master/LICENSE > # # All rights reserved. import config from config import PRIVATE_BOT_MODE from YukkiMusic.core.mongo import mongodb channeldb = mongodb.cplaymode commanddb = mongodb.commands cleandb = mongodb.cleanmode playmodedb = mongodb.playmode playtypedb = mongodb.playtypedb langdb = mongodb.language authdb = mongodb.adminauth videodb = mongodb.yukkivideocalls onoffdb = mongodb.onoffper suggdb = mongodb.suggestion autoenddb = mongodb.autoend # Shifting to memory [ mongo sucks often] loop = {} playtype = {} playmode = {} channelconnect = {} langm = {} pause = {} mute = {} audio = {} video = {} active = [] activevideo = [] command = [] cleanmode = [] nonadmin = {} vlimit = [] maintenance = [] suggestion = {} autoend = {} # Auto End Stream async def is_autoend() -> bool: chat_id = 123 mode = autoend.get(chat_id) if not mode: user = await autoenddb.find_one({"chat_id": chat_id}) if not user: autoend[chat_id] = False return False autoend[chat_id] = True return True return mode async def autoend_on(): chat_id = 123 autoend[chat_id] = True user = await autoenddb.find_one({"chat_id": chat_id}) if not user: return await autoenddb.insert_one({"chat_id": chat_id}) async def autoend_off(): chat_id = 123 autoend[chat_id] = False user = await autoenddb.find_one({"chat_id": chat_id}) if user: return await autoenddb.delete_one({"chat_id": chat_id}) # SUGGESTION async def is_suggestion(chat_id: int) -> bool: mode = suggestion.get(chat_id) if not mode: user = await suggdb.find_one({"chat_id": chat_id}) if not user: suggestion[chat_id] = True return True suggestion[chat_id] = False return False return mode async def suggestion_on(chat_id: int): suggestion[chat_id] = True user = await suggdb.find_one({"chat_id": chat_id}) if user: return await suggdb.delete_one({"chat_id": chat_id}) async def suggestion_off(chat_id: int): suggestion[chat_id] = False user = await suggdb.find_one({"chat_id": chat_id}) if not user: return await suggdb.insert_one({"chat_id": chat_id}) # LOOP PLAY async def get_loop(chat_id: int) -> int: lop = loop.get(chat_id) if not lop: return 0 return lop async def set_loop(chat_id: int, mode: int): loop[chat_id] = mode # Channel Play IDS async def get_cmode(chat_id: int) -> int: mode = channelconnect.get(chat_id) if not mode: mode = await channeldb.find_one({"chat_id": chat_id}) if not mode: return None channelconnect[chat_id] = mode["mode"] return mode["mode"] return mode async def set_cmode(chat_id: int, mode: int): channelconnect[chat_id] = mode await channeldb.update_one( {"chat_id": chat_id}, {"$set": {"mode": mode}}, upsert=True ) # PLAY TYPE WHETHER ADMINS ONLY OR EVERYONE async def get_playtype(chat_id: int) -> str: mode = playtype.get(chat_id) if not mode: mode = await playtypedb.find_one({"chat_id": chat_id}) if not mode: playtype[chat_id] = "Everyone" return "Everyone" playtype[chat_id] = mode["mode"] return mode["mode"] return mode async def set_playtype(chat_id: int, mode: str): playtype[chat_id] = mode await playtypedb.update_one( {"chat_id": chat_id}, {"$set": {"mode": mode}}, upsert=True ) # play mode whether inline or direct query async def get_playmode(chat_id: int) -> str: mode = playmode.get(chat_id) if not mode: mode = await playmodedb.find_one({"chat_id": chat_id}) if not mode: playmode[chat_id] = "Direct" return "Direct" playmode[chat_id] = mode["mode"] return mode["mode"] return mode async def set_playmode(chat_id: int, mode: str): playmode[chat_id] = mode await playmodedb.update_one( {"chat_id": chat_id}, {"$set": {"mode": mode}}, upsert=True ) # language async def get_lang(chat_id: int) -> str: mode = langm.get(chat_id) if not mode: lang = await langdb.find_one({"chat_id": chat_id}) if not lang: langm[chat_id] = "en" return "en" langm[chat_id] = lang["lang"] return lang["lang"] return mode async def set_lang(chat_id: int, lang: str): langm[chat_id] = lang await langdb.update_one( {"chat_id": chat_id}, {"$set": {"lang": lang}}, upsert=True ) # Muted async def is_muted(chat_id: int) -> bool: mode = mute.get(chat_id) if not mode: return False return mode async def mute_on(chat_id: int): mute[chat_id] = True async def mute_off(chat_id: int): mute[chat_id] = False # Pause-Skip async def is_music_playing(chat_id: int) -> bool: mode = pause.get(chat_id) if not mode: return False return mode async def music_on(chat_id: int): pause[chat_id] = True async def music_off(chat_id: int): pause[chat_id] = False # Active Voice Chats async def get_active_chats() -> list: return active async def is_active_chat(chat_id: int) -> bool: if chat_id not in active: return False else: return True async def add_active_chat(chat_id: int): if chat_id not in active: active.append(chat_id) async def remove_active_chat(chat_id: int): if chat_id in active: active.remove(chat_id) # Active Video Chats async def get_active_video_chats() -> list: return activevideo async def is_active_video_chat(chat_id: int) -> bool: if chat_id not in activevideo: return False else: return True async def add_active_video_chat(chat_id: int): if chat_id not in activevideo: activevideo.append(chat_id) async def remove_active_video_chat(chat_id: int): if chat_id in activevideo: activevideo.remove(chat_id) # Delete command mode async def is_commanddelete_on(chat_id: int) -> bool: if chat_id not in command: return True else: return False async def commanddelete_off(chat_id: int): if chat_id not in command: command.append(chat_id) async def commanddelete_on(chat_id: int): try: command.remove(chat_id) except: pass # Clean Mode async def is_cleanmode_on(chat_id: int) -> bool: if chat_id not in cleanmode: return True else: return False async def cleanmode_off(chat_id: int): if chat_id not in cleanmode: cleanmode.append(chat_id) async def cleanmode_on(chat_id: int): try: cleanmode.remove(chat_id) except: pass # Non Admin Chat async def check_nonadmin_chat(chat_id: int) -> bool: user = await authdb.find_one({"chat_id": chat_id}) if not user: return False return True async def is_nonadmin_chat(chat_id: int) -> bool: mode = nonadmin.get(chat_id) if not mode: user = await authdb.find_one({"chat_id": chat_id}) if not user: nonadmin[chat_id] = False return False nonadmin[chat_id] = True return True return mode async def add_nonadmin_chat(chat_id: int): nonadmin[chat_id] = True is_admin = await check_nonadmin_chat(chat_id) if is_admin: return return await authdb.insert_one({"chat_id": chat_id}) async def remove_nonadmin_chat(chat_id: int): nonadmin[chat_id] = False is_admin = await check_nonadmin_chat(chat_id) if not is_admin: return return await authdb.delete_one({"chat_id": chat_id}) # Video Limit async def is_video_allowed(chat_idd) -> str: chat_id = 123456 if not vlimit: dblimit = await videodb.find_one({"chat_id": chat_id}) if not dblimit: vlimit.clear() vlimit.append(config.VIDEO_STREAM_LIMIT) limit = config.VIDEO_STREAM_LIMIT else: limit = dblimit["limit"] vlimit.clear() vlimit.append(limit) else: limit = vlimit[0] if limit == 0: return False count = len(await get_active_video_chats()) if int(count) == int(limit): if not await is_active_video_chat(chat_idd): return False return True async def get_video_limit() -> str: chat_id = 123456 if not vlimit: dblimit = await videodb.find_one({"chat_id": chat_id}) if not dblimit: limit = config.VIDEO_STREAM_LIMIT else: limit = dblimit["limit"] else: limit = vlimit[0] return limit async def set_video_limit(limt: int): chat_id = 123456 vlimit.clear() vlimit.append(limt) return await videodb.update_one( {"chat_id": chat_id}, {"$set": {"limit": limt}}, upsert=True ) # On Off async def is_on_off(on_off: int) -> bool: onoff = await onoffdb.find_one({"on_off": on_off}) if not onoff: return False return True async def add_on(on_off: int): is_on = await is_on_off(on_off) if is_on: return return await onoffdb.insert_one({"on_off": on_off}) async def add_off(on_off: int): is_off = await is_on_off(on_off) if not is_off: return return await onoffdb.delete_one({"on_off": on_off}) # Maintenance async def is_maintenance(): if not maintenance: get = await onoffdb.find_one({"on_off": 1}) if not get: maintenance.clear() maintenance.append(2) return True else: maintenance.clear() maintenance.append(1) return False else: if 1 in maintenance: return False else: return True async def maintenance_off(): maintenance.clear() maintenance.append(2) is_off = await is_on_off(1) if not is_off: return return await onoffdb.delete_one({"on_off": 1}) async def maintenance_on(): maintenance.clear() maintenance.append(1) is_on = await is_on_off(1) if is_on: return return await onoffdb.insert_one({"on_off": 1}) # Audio Video Limit from pytgcalls.types.stream.legacy.quality import (HighQualityAudio, HighQualityVideo, LowQualityAudio, LowQualityVideo, MediumQualityAudio, MediumQualityVideo) async def save_audio_bitrate(chat_id: int, bitrate: str): audio[chat_id] = bitrate async def save_video_bitrate(chat_id: int, bitrate: str): video[chat_id] = bitrate async def get_aud_bit_name(chat_id: int) -> str: mode = audio.get(chat_id) if not mode: return "High" return mode async def get_vid_bit_name(chat_id: int) -> str: mode = video.get(chat_id) if not mode: if PRIVATE_BOT_MODE == str(True): return "High" else: return "Medium" return mode async def get_audio_bitrate(chat_id: int) -> str: mode = audio.get(chat_id) if not mode: return MediumQualityAudio() if str(mode) == "High": return HighQualityAudio() elif str(mode) == "Medium": return MediumQualityAudio() elif str(mode) == "Low": return LowQualityAudio() async def get_video_bitrate(chat_id: int) -> str: mode = video.get(chat_id) if not mode: if PRIVATE_BOT_MODE == str(True): return HighQualityVideo() else: return MediumQualityVideo() if str(mode) == "High": return HighQualityVideo() elif str(mode) == "Medium": return MediumQualityVideo() elif str(mode) == "Low": return LowQualityVideo()