| from importlib import import_module | |
| from aiofiles import open as aiopen | |
| from aiofiles.os import path as aiopath | |
| from motor.motor_asyncio import AsyncIOMotorClient | |
| from pymongo.errors import PyMongoError | |
| from pymongo.server_api import ServerApi | |
| from ... import LOGGER, qbit_options, rss_dict, user_data | |
| from ...core.config_manager import Config | |
| from ...core.tg_client import TgClient | |
| class DbManager: | |
| def __init__(self): | |
| self._return = True | |
| self._conn = None | |
| self.db = None | |
| async def connect(self): | |
| try: | |
| if self._conn is not None: | |
| await self._conn.close() | |
| self._conn = AsyncIOMotorClient( | |
| Config.DATABASE_URL, server_api=ServerApi("1") | |
| ) | |
| self.db = self._conn.wzmlx | |
| self._return = False | |
| except PyMongoError as e: | |
| LOGGER.error(f"Error in DB connection: {e}") | |
| self.db = None | |
| self._return = True | |
| self._conn = None | |
| async def disconnect(self): | |
| self._return = True | |
| if self._conn is not None: | |
| await self._conn.close() | |
| self._conn = None | |
| async def update_deploy_config(self): | |
| if self._return: | |
| return | |
| settings = import_module("config") | |
| config_file = { | |
| key: value.strip() if isinstance(value, str) else value | |
| for key, value in vars(settings).items() | |
| if not key.startswith("__") | |
| } | |
| await self.db.settings.deployConfig.replace_one( | |
| {"_id": TgClient.ID}, config_file, upsert=True | |
| ) | |
| async def update_config(self, dict_): | |
| if self._return: | |
| return | |
| await self.db.settings.config.update_one( | |
| {"_id": TgClient.ID}, {"$set": dict_}, upsert=True | |
| ) | |
| async def update_aria2(self, key, value): | |
| if self._return: | |
| return | |
| await self.db.settings.aria2c.update_one( | |
| {"_id": TgClient.ID}, {"$set": {key: value}}, upsert=True | |
| ) | |
| async def update_qbittorrent(self, key, value): | |
| if self._return: | |
| return | |
| await self.db.settings.qbittorrent.update_one( | |
| {"_id": TgClient.ID}, {"$set": {key: value}}, upsert=True | |
| ) | |
| async def save_qbit_settings(self): | |
| if self._return: | |
| return | |
| await self.db.settings.qbittorrent.update_one( | |
| {"_id": TgClient.ID}, {"$set": qbit_options}, upsert=True | |
| ) | |
| async def update_private_file(self, path): | |
| if self._return: | |
| return | |
| db_path = path.replace(".", "__") | |
| if await aiopath.exists(path): | |
| async with aiopen(path, "rb+") as pf: | |
| pf_bin = await pf.read() | |
| await self.db.settings.files.update_one( | |
| {"_id": TgClient.ID}, {"$set": {db_path: pf_bin}}, upsert=True | |
| ) | |
| if path == "config.py": | |
| await self.update_deploy_config() | |
| else: | |
| await self.db.settings.files.update_one( | |
| {"_id": TgClient.ID}, {"$unset": {db_path: ""}}, upsert=True | |
| ) | |
| async def update_nzb_config(self): | |
| if self._return: | |
| return | |
| async with aiopen("sabnzbd/SABnzbd.ini", "rb+") as pf: | |
| nzb_conf = await pf.read() | |
| await self.db.settings.nzb.replace_one( | |
| {"_id": TgClient.ID}, {"SABnzbd__ini": nzb_conf}, upsert=True | |
| ) | |
| async def update_user_data(self, user_id): | |
| if self._return: | |
| return | |
| data = user_data.get(user_id, {}) | |
| data = data.copy() | |
| for key in ("THUMBNAIL", "RCLONE_CONFIG", "TOKEN_PICKLE", "USER_COOKIE_FILE"): | |
| data.pop(key, None) | |
| pipeline = [ | |
| { | |
| "$replaceRoot": { | |
| "newRoot": { | |
| "$mergeObjects": [ | |
| data, | |
| { | |
| "$arrayToObject": { | |
| "$filter": { | |
| "input": {"$objectToArray": "$$ROOT"}, | |
| "as": "field", | |
| "cond": { | |
| "$in": [ | |
| "$$field.k", | |
| [ | |
| "THUMBNAIL", | |
| "RCLONE_CONFIG", | |
| "TOKEN_PICKLE", | |
| "USER_COOKIE_FILE", | |
| ], | |
| ] | |
| }, | |
| } | |
| } | |
| }, | |
| ] | |
| } | |
| } | |
| } | |
| ] | |
| await self.db.users[TgClient.ID].update_one( | |
| {"_id": user_id}, pipeline, upsert=True | |
| ) | |
| async def update_user_doc(self, user_id, key, path=""): | |
| if self._return: | |
| return | |
| if path: | |
| async with aiopen(path, "rb+") as doc: | |
| doc_bin = await doc.read() | |
| await self.db.users[TgClient.ID].update_one( | |
| {"_id": user_id}, {"$set": {key: doc_bin}}, upsert=True | |
| ) | |
| else: | |
| await self.db.users[TgClient.ID].update_one( | |
| {"_id": user_id}, {"$unset": {key: ""}}, upsert=True | |
| ) | |
| async def rss_update_all(self): | |
| if self._return: | |
| return | |
| for user_id in list(rss_dict.keys()): | |
| await self.db.rss[TgClient.ID].replace_one( | |
| {"_id": user_id}, rss_dict[user_id], upsert=True | |
| ) | |
| async def rss_update(self, user_id): | |
| if self._return: | |
| return | |
| await self.db.rss[TgClient.ID].replace_one( | |
| {"_id": user_id}, rss_dict[user_id], upsert=True | |
| ) | |
| async def rss_delete(self, user_id): | |
| if self._return: | |
| return | |
| await self.db.rss[TgClient.ID].delete_one({"_id": user_id}) | |
| async def add_incomplete_task(self, cid, link, tag): | |
| if self._return: | |
| return | |
| await self.db.tasks[TgClient.ID].insert_one( | |
| {"_id": link, "cid": cid, "tag": tag} | |
| ) | |
| async def get_pm_uids(self): | |
| if self._return: | |
| return | |
| return [doc["_id"] async for doc in self.db.pm_users[TgClient.ID].find({})] | |
| async def set_pm_users(self, user_id): | |
| if self._return: | |
| return | |
| if not bool(await self.db.pm_users[TgClient.ID].find_one({"_id": user_id})): | |
| await self.db.pm_users[TgClient.ID].insert_one({"_id": user_id}) | |
| LOGGER.info(f"New PM User Added : {user_id}") | |
| async def rm_pm_user(self, user_id): | |
| if self._return: | |
| return | |
| await self.db.pm_users[TgClient.ID].delete_one({"_id": user_id}) | |
| async def rm_complete_task(self, link): | |
| if self._return: | |
| return | |
| await self.db.tasks[TgClient.ID].delete_one({"_id": link}) | |
| async def get_incomplete_tasks(self): | |
| notifier_dict = {} | |
| if self._return: | |
| return notifier_dict | |
| if await self.db.tasks[TgClient.ID].find_one(): | |
| rows = self.db.tasks[TgClient.ID].find({}) | |
| async for row in rows: | |
| if row["cid"] in list(notifier_dict.keys()): | |
| if row["tag"] in list(notifier_dict[row["cid"]]): | |
| notifier_dict[row["cid"]][row["tag"]].append(row["_id"]) | |
| else: | |
| notifier_dict[row["cid"]][row["tag"]] = [row["_id"]] | |
| else: | |
| notifier_dict[row["cid"]] = {row["tag"]: [row["_id"]]} | |
| await self.db.tasks[TgClient.ID].drop() | |
| return notifier_dict | |
| async def trunc_table(self, name): | |
| if self._return: | |
| return | |
| await self.db[name][TgClient.ID].drop() | |
| database = DbManager() | |