ONesVC_bot / YukkiMusic /utils /database /memorydatabase.py
BinaryONe
Refresh Update
2f67506
#
# Copyright (C) 2021-2022 by TeamYukki@Github, < https://github.com/TeamYukki >.
#
# This file is part of < https://github.com/TeamYukki/YukkiMusicBot > project,
# and is released under the "GNU v3.0 License Agreement".
# Please see < https://github.com/TeamYukki/YukkiMusicBot/blob/master/LICENSE >
#
# All rights reserved.
import config
from config import PRIVATE_BOT_MODE
from YukkiMusic.core.mongo import mongodb
channeldb = mongodb.cplaymode
commanddb = mongodb.commands
cleandb = mongodb.cleanmode
playmodedb = mongodb.playmode
playtypedb = mongodb.playtypedb
langdb = mongodb.language
authdb = mongodb.adminauth
videodb = mongodb.yukkivideocalls
onoffdb = mongodb.onoffper
suggdb = mongodb.suggestion
autoenddb = mongodb.autoend
# Shifting to memory [ mongo sucks often]
loop = {}
playtype = {}
playmode = {}
channelconnect = {}
langm = {}
pause = {}
mute = {}
audio = {}
video = {}
active = []
activevideo = []
command = []
cleanmode = []
nonadmin = {}
vlimit = []
maintenance = []
suggestion = {}
autoend = {}
# Auto End Stream
async def is_autoend() -> bool:
chat_id = 123
mode = autoend.get(chat_id)
if not mode:
user = await autoenddb.find_one({"chat_id": chat_id})
if not user:
autoend[chat_id] = False
return False
autoend[chat_id] = True
return True
return mode
async def autoend_on():
chat_id = 123
autoend[chat_id] = True
user = await autoenddb.find_one({"chat_id": chat_id})
if not user:
return await autoenddb.insert_one({"chat_id": chat_id})
async def autoend_off():
chat_id = 123
autoend[chat_id] = False
user = await autoenddb.find_one({"chat_id": chat_id})
if user:
return await autoenddb.delete_one({"chat_id": chat_id})
# SUGGESTION
async def is_suggestion(chat_id: int) -> bool:
mode = suggestion.get(chat_id)
if not mode:
user = await suggdb.find_one({"chat_id": chat_id})
if not user:
suggestion[chat_id] = True
return True
suggestion[chat_id] = False
return False
return mode
async def suggestion_on(chat_id: int):
suggestion[chat_id] = True
user = await suggdb.find_one({"chat_id": chat_id})
if user:
return await suggdb.delete_one({"chat_id": chat_id})
async def suggestion_off(chat_id: int):
suggestion[chat_id] = False
user = await suggdb.find_one({"chat_id": chat_id})
if not user:
return await suggdb.insert_one({"chat_id": chat_id})
# LOOP PLAY
async def get_loop(chat_id: int) -> int:
lop = loop.get(chat_id)
if not lop:
return 0
return lop
async def set_loop(chat_id: int, mode: int):
loop[chat_id] = mode
# Channel Play IDS
async def get_cmode(chat_id: int) -> int:
mode = channelconnect.get(chat_id)
if not mode:
mode = await channeldb.find_one({"chat_id": chat_id})
if not mode:
return None
channelconnect[chat_id] = mode["mode"]
return mode["mode"]
return mode
async def set_cmode(chat_id: int, mode: int):
channelconnect[chat_id] = mode
await channeldb.update_one(
{"chat_id": chat_id}, {"$set": {"mode": mode}}, upsert=True
)
# PLAY TYPE WHETHER ADMINS ONLY OR EVERYONE
async def get_playtype(chat_id: int) -> str:
mode = playtype.get(chat_id)
if not mode:
mode = await playtypedb.find_one({"chat_id": chat_id})
if not mode:
playtype[chat_id] = "Everyone"
return "Everyone"
playtype[chat_id] = mode["mode"]
return mode["mode"]
return mode
async def set_playtype(chat_id: int, mode: str):
playtype[chat_id] = mode
await playtypedb.update_one(
{"chat_id": chat_id}, {"$set": {"mode": mode}}, upsert=True
)
# play mode whether inline or direct query
async def get_playmode(chat_id: int) -> str:
mode = playmode.get(chat_id)
if not mode:
mode = await playmodedb.find_one({"chat_id": chat_id})
if not mode:
playmode[chat_id] = "Direct"
return "Direct"
playmode[chat_id] = mode["mode"]
return mode["mode"]
return mode
async def set_playmode(chat_id: int, mode: str):
playmode[chat_id] = mode
await playmodedb.update_one(
{"chat_id": chat_id}, {"$set": {"mode": mode}}, upsert=True
)
# language
async def get_lang(chat_id: int) -> str:
mode = langm.get(chat_id)
if not mode:
lang = await langdb.find_one({"chat_id": chat_id})
if not lang:
langm[chat_id] = "en"
return "en"
langm[chat_id] = lang["lang"]
return lang["lang"]
return mode
async def set_lang(chat_id: int, lang: str):
langm[chat_id] = lang
await langdb.update_one(
{"chat_id": chat_id}, {"$set": {"lang": lang}}, upsert=True
)
# Muted
async def is_muted(chat_id: int) -> bool:
mode = mute.get(chat_id)
if not mode:
return False
return mode
async def mute_on(chat_id: int):
mute[chat_id] = True
async def mute_off(chat_id: int):
mute[chat_id] = False
# Pause-Skip
async def is_music_playing(chat_id: int) -> bool:
mode = pause.get(chat_id)
if not mode:
return False
return mode
async def music_on(chat_id: int):
pause[chat_id] = True
async def music_off(chat_id: int):
pause[chat_id] = False
# Active Voice Chats
async def get_active_chats() -> list:
return active
async def is_active_chat(chat_id: int) -> bool:
if chat_id not in active:
return False
else:
return True
async def add_active_chat(chat_id: int):
if chat_id not in active:
active.append(chat_id)
async def remove_active_chat(chat_id: int):
if chat_id in active:
active.remove(chat_id)
# Active Video Chats
async def get_active_video_chats() -> list:
return activevideo
async def is_active_video_chat(chat_id: int) -> bool:
if chat_id not in activevideo:
return False
else:
return True
async def add_active_video_chat(chat_id: int):
if chat_id not in activevideo:
activevideo.append(chat_id)
async def remove_active_video_chat(chat_id: int):
if chat_id in activevideo:
activevideo.remove(chat_id)
# Delete command mode
async def is_commanddelete_on(chat_id: int) -> bool:
if chat_id not in command:
return True
else:
return False
async def commanddelete_off(chat_id: int):
if chat_id not in command:
command.append(chat_id)
async def commanddelete_on(chat_id: int):
try:
command.remove(chat_id)
except:
pass
# Clean Mode
async def is_cleanmode_on(chat_id: int) -> bool:
if chat_id not in cleanmode:
return True
else:
return False
async def cleanmode_off(chat_id: int):
if chat_id not in cleanmode:
cleanmode.append(chat_id)
async def cleanmode_on(chat_id: int):
try:
cleanmode.remove(chat_id)
except:
pass
# Non Admin Chat
async def check_nonadmin_chat(chat_id: int) -> bool:
user = await authdb.find_one({"chat_id": chat_id})
if not user:
return False
return True
async def is_nonadmin_chat(chat_id: int) -> bool:
mode = nonadmin.get(chat_id)
if not mode:
user = await authdb.find_one({"chat_id": chat_id})
if not user:
nonadmin[chat_id] = False
return False
nonadmin[chat_id] = True
return True
return mode
async def add_nonadmin_chat(chat_id: int):
nonadmin[chat_id] = True
is_admin = await check_nonadmin_chat(chat_id)
if is_admin:
return
return await authdb.insert_one({"chat_id": chat_id})
async def remove_nonadmin_chat(chat_id: int):
nonadmin[chat_id] = False
is_admin = await check_nonadmin_chat(chat_id)
if not is_admin:
return
return await authdb.delete_one({"chat_id": chat_id})
# Video Limit
async def is_video_allowed(chat_idd) -> str:
chat_id = 123456
if not vlimit:
dblimit = await videodb.find_one({"chat_id": chat_id})
if not dblimit:
vlimit.clear()
vlimit.append(config.VIDEO_STREAM_LIMIT)
limit = config.VIDEO_STREAM_LIMIT
else:
limit = dblimit["limit"]
vlimit.clear()
vlimit.append(limit)
else:
limit = vlimit[0]
if limit == 0:
return False
count = len(await get_active_video_chats())
if int(count) == int(limit):
if not await is_active_video_chat(chat_idd):
return False
return True
async def get_video_limit() -> str:
chat_id = 123456
if not vlimit:
dblimit = await videodb.find_one({"chat_id": chat_id})
if not dblimit:
limit = config.VIDEO_STREAM_LIMIT
else:
limit = dblimit["limit"]
else:
limit = vlimit[0]
return limit
async def set_video_limit(limt: int):
chat_id = 123456
vlimit.clear()
vlimit.append(limt)
return await videodb.update_one(
{"chat_id": chat_id}, {"$set": {"limit": limt}}, upsert=True
)
# On Off
async def is_on_off(on_off: int) -> bool:
onoff = await onoffdb.find_one({"on_off": on_off})
if not onoff:
return False
return True
async def add_on(on_off: int):
is_on = await is_on_off(on_off)
if is_on:
return
return await onoffdb.insert_one({"on_off": on_off})
async def add_off(on_off: int):
is_off = await is_on_off(on_off)
if not is_off:
return
return await onoffdb.delete_one({"on_off": on_off})
# Maintenance
async def is_maintenance():
if not maintenance:
get = await onoffdb.find_one({"on_off": 1})
if not get:
maintenance.clear()
maintenance.append(2)
return True
else:
maintenance.clear()
maintenance.append(1)
return False
else:
if 1 in maintenance:
return False
else:
return True
async def maintenance_off():
maintenance.clear()
maintenance.append(2)
is_off = await is_on_off(1)
if not is_off:
return
return await onoffdb.delete_one({"on_off": 1})
async def maintenance_on():
maintenance.clear()
maintenance.append(1)
is_on = await is_on_off(1)
if is_on:
return
return await onoffdb.insert_one({"on_off": 1})
# Audio Video Limit
from pytgcalls.types.stream.legacy.quality import (HighQualityAudio,
HighQualityVideo,
LowQualityAudio,
LowQualityVideo,
MediumQualityAudio,
MediumQualityVideo)
async def save_audio_bitrate(chat_id: int, bitrate: str):
audio[chat_id] = bitrate
async def save_video_bitrate(chat_id: int, bitrate: str):
video[chat_id] = bitrate
async def get_aud_bit_name(chat_id: int) -> str:
mode = audio.get(chat_id)
if not mode:
return "High"
return mode
async def get_vid_bit_name(chat_id: int) -> str:
mode = video.get(chat_id)
if not mode:
if PRIVATE_BOT_MODE == str(True):
return "High"
else:
return "Medium"
return mode
async def get_audio_bitrate(chat_id: int) -> str:
mode = audio.get(chat_id)
if not mode:
return MediumQualityAudio()
if str(mode) == "High":
return HighQualityAudio()
elif str(mode) == "Medium":
return MediumQualityAudio()
elif str(mode) == "Low":
return LowQualityAudio()
async def get_video_bitrate(chat_id: int) -> str:
mode = video.get(chat_id)
if not mode:
if PRIVATE_BOT_MODE == str(True):
return HighQualityVideo()
else:
return MediumQualityVideo()
if str(mode) == "High":
return HighQualityVideo()
elif str(mode) == "Medium":
return MediumQualityVideo()
elif str(mode) == "Low":
return LowQualityVideo()